Add example client to examples/clients folder

This commit is contained in:
3choff
2024-12-11 16:06:10 +00:00
parent aaf32b5307
commit a0216c3e50
8 changed files with 619 additions and 0 deletions

View File

@@ -0,0 +1 @@
3.10

View File

@@ -0,0 +1,110 @@
# MCP Simple Chatbot
This example demonstrates how to integrate the Model Context Protocol (MCP) into a simple CLI chatbot. The implementation showcases MCP's flexibility by supporting multiple tools through MCP servers and is compatible with any LLM provider that follows OpenAI API standards.
## Requirements
- Python 3.10
- `python-dotenv`
- `requests`
- `mcp`
- `uvicorn`
## Installation
1. **Install the dependencies:**
```bash
pip install -r requirements.txt
```
2. **Set up environment variables:**
Create a `.env` file in the root directory and add your API key:
```plaintext
LLM_API_KEY=your_api_key_here
```
3. **Configure servers:**
The `servers_config.json` follows the same structure as Claude Desktop, allowing for easy integration of multiple servers.
Here's an example:
```json
{
"mcpServers": {
"sqlite": {
"command": "uvx",
"args": ["mcp-server-sqlite", "--db-path", "./test.db"]
},
"puppeteer": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-puppeteer"]
}
}
}
```
Environment variables are supported as well. Pass them as you would with the Claude Desktop App.
Example:
```json
{
"mcpServers": {
"server_name": {
"command": "uvx",
"args": ["mcp-server-name", "--additional-args"],
"env": {
"API_KEY": "your_api_key_here"
}
}
}
}
```
## Usage
1. **Run the client:**
```bash
python main.py
```
2. **Interact with the assistant:**
The assistant will automatically detect available tools and can respond to queries based on the tools provided by the configured servers.
3. **Exit the session:**
Type `quit` or `exit` to end the session.
## Architecture
- **Tool Discovery**: Tools are automatically discovered from configured servers.
- **System Prompt**: Tools are dynamically included in the system prompt, allowing the LLM to understand available capabilities.
- **Server Integration**: Supports any MCP-compatible server, tested with various server implementations including Uvicorn and Node.js.
### Class Structure
- **Configuration**: Manages environment variables and server configurations
- **Server**: Handles MCP server initialization, tool discovery, and execution
- **Tool**: Represents individual tools with their properties and formatting
- **LLMClient**: Manages communication with the LLM provider
- **ChatSession**: Orchestrates the interaction between user, LLM, and tools
### Logic Flow
1. **Tool Integration**:
- Tools are dynamically discovered from MCP servers
- Tool descriptions are automatically included in system prompt
- Tool execution is handled through standardized MCP protocol
2. **Runtime Flow**:
- User input is received
- Input is sent to LLM with context of available tools
- LLM response is parsed:
- If it's a tool call → execute tool and return result
- If it's a direct response → return to user
- Tool results are sent back to LLM for interpretation
- Final response is presented to user

View File

@@ -0,0 +1 @@
GROQ_API_KEY=gsk_1234567890

View File

@@ -0,0 +1,443 @@
import asyncio
import json
import logging
import os
import shutil
from typing import Any, Dict, List, Optional
import requests
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
class Configuration:
"""Manages configuration and environment variables for the MCP client."""
def __init__(self) -> None:
"""Initialize configuration with environment variables."""
self.load_env()
self.api_key = os.getenv("GROQ_API_KEY")
@staticmethod
def load_env() -> None:
"""Load environment variables from .env file."""
load_dotenv()
@staticmethod
def load_config(file_path: str) -> Dict[str, Any]:
"""Load server configuration from JSON file.
Args:
file_path: Path to the JSON configuration file.
Returns:
Dict containing server configuration.
Raises:
FileNotFoundError: If configuration file doesn't exist.
JSONDecodeError: If configuration file is invalid JSON.
"""
with open(file_path, "r") as f:
return json.load(f)
@property
def llm_api_key(self) -> str:
"""Get the LLM API key.
Returns:
The API key as a string.
Raises:
ValueError: If the API key is not found in environment variables.
"""
if not self.api_key:
raise ValueError("LLM_API_KEY not found in environment variables")
return self.api_key
class Server:
"""Manages MCP server connections and tool execution."""
def __init__(self, name: str, config: Dict[str, Any]) -> None:
self.name: str = name
self.config: Dict[str, Any] = config
self.stdio_context: Optional[Any] = None
self.session: Optional[ClientSession] = None
self._cleanup_lock: asyncio.Lock = asyncio.Lock()
async def initialize(self) -> None:
"""Initialize the server connection."""
command = (
shutil.which("npx")
if self.config["command"] == "npx"
else self.config["command"]
)
if command is None:
raise ValueError("The command must be a valid string and cannot be None.")
server_params = StdioServerParameters(
command=command,
args=self.config["args"],
env={**os.environ, **self.config["env"]}
if self.config.get("env")
else None,
)
try:
self.stdio_context = stdio_client(server_params)
read, write = await self.stdio_context.__aenter__()
self.session = ClientSession(read, write)
await self.session.__aenter__()
await self.session.initialize()
except Exception as e:
logging.error(f"Error initializing server {self.name}: {e}")
await self.cleanup()
raise
async def list_tools(self) -> List[Any]:
"""List available tools from the server.
Returns:
A list of available tools.
Raises:
RuntimeError: If the server is not initialized.
"""
if not self.session:
raise RuntimeError(f"Server {self.name} not initialized")
tools_response = await self.session.list_tools()
tools = []
for item in tools_response:
if isinstance(item, tuple) and item[0] == "tools":
for tool in item[1]:
tools.append(Tool(tool.name, tool.description, tool.inputSchema))
return tools
async def execute_tool(
self,
tool_name: str,
arguments: Dict[str, Any],
retries: int = 2,
delay: float = 1.0,
) -> Any:
"""Execute a tool with retry mechanism.
Args:
tool_name: Name of the tool to execute.
arguments: Tool arguments.
retries: Number of retry attempts.
delay: Delay between retries in seconds.
Returns:
Tool execution result.
Raises:
RuntimeError: If server is not initialized.
Exception: If tool execution fails after all retries.
"""
if not self.session:
raise RuntimeError(f"Server {self.name} not initialized")
attempt = 0
while attempt < retries:
try:
logging.info(f"Executing {tool_name}...")
result = await self.session.call_tool(tool_name, arguments)
return result
except Exception as e:
attempt += 1
logging.warning(
f"Error executing tool: {e}. Attempt {attempt} of {retries}."
)
if attempt < retries:
logging.info(f"Retrying in {delay} seconds...")
await asyncio.sleep(delay)
else:
logging.error("Max retries reached. Failing.")
raise
async def cleanup(self) -> None:
"""Clean up server resources."""
async with self._cleanup_lock:
try:
if self.session:
try:
await self.session.__aexit__(None, None, None)
except Exception as e:
logging.warning(
f"Warning during session cleanup for {self.name}: {e}"
)
finally:
self.session = None
if self.stdio_context:
try:
await self.stdio_context.__aexit__(None, None, None)
except (RuntimeError, asyncio.CancelledError) as e:
logging.info(
f"Note: Normal shutdown message for {self.name}: {e}"
)
except Exception as e:
logging.warning(
f"Warning during stdio cleanup for {self.name}: {e}"
)
finally:
self.stdio_context = None
except Exception as e:
logging.error(f"Error during cleanup of server {self.name}: {e}")
class Tool:
"""Represents a tool with its properties and formatting."""
def __init__(
self, name: str, description: str, input_schema: Dict[str, Any]
) -> None:
self.name: str = name
self.description: str = description
self.input_schema: Dict[str, Any] = input_schema
def format_for_llm(self) -> str:
"""Format tool information for LLM.
Returns:
A formatted string describing the tool.
"""
args_desc = []
if "properties" in self.input_schema:
for param_name, param_info in self.input_schema["properties"].items():
arg_desc = (
f"- {param_name}: {param_info.get('description', 'No description')}"
)
if param_name in self.input_schema.get("required", []):
arg_desc += " (required)"
args_desc.append(arg_desc)
return f"""
Tool: {self.name}
Description: {self.description}
Arguments:
{chr(10).join(args_desc)}
"""
class LLMClient:
"""Manages communication with the LLM provider."""
def __init__(self, api_key: str) -> None:
self.api_key: str = api_key
def get_response(self, messages: List[Dict[str, str]]) -> str:
"""Get a response from the LLM.
Args:
messages: A list of message dictionaries.
Returns:
The LLM's response as a string.
Raises:
RequestException: If the request to the LLM fails.
"""
url = "https://api.groq.com/openai/v1/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
payload = {
"messages": messages,
"model": "llama-3.2-90b-vision-preview",
"temperature": 0.7,
"max_tokens": 4096,
"top_p": 1,
"stream": False,
"stop": None,
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
except requests.exceptions.RequestException as e:
error_message = f"Error getting LLM response: {str(e)}"
logging.error(error_message)
if e.response is not None:
status_code = e.response.status_code
logging.error(f"Status code: {status_code}")
logging.error(f"Response details: {e.response.text}")
return (
f"I encountered an error: {error_message}. "
"Please try again or rephrase your request."
)
class ChatSession:
"""Orchestrates the interaction between user, LLM, and tools."""
def __init__(self, servers: List[Server], llm_client: LLMClient) -> None:
self.servers: List[Server] = servers
self.llm_client: LLMClient = llm_client
async def cleanup_servers(self) -> None:
"""Clean up all servers properly."""
cleanup_tasks = []
for server in self.servers:
cleanup_tasks.append(asyncio.create_task(server.cleanup()))
if cleanup_tasks:
try:
await asyncio.gather(*cleanup_tasks, return_exceptions=True)
except Exception as e:
logging.warning(f"Warning during final cleanup: {e}")
async def process_llm_response(self, llm_response: str) -> str:
"""Process the LLM response and execute tools if needed.
Args:
llm_response: The response from the LLM.
Returns:
The result of tool execution or the original response.
"""
import json
try:
tool_call = json.loads(llm_response)
if "tool" in tool_call and "arguments" in tool_call:
logging.info(f"Executing tool: {tool_call['tool']}")
logging.info(f"With arguments: {tool_call['arguments']}")
for server in self.servers:
tools = await server.list_tools()
if any(tool.name == tool_call["tool"] for tool in tools):
try:
result = await server.execute_tool(
tool_call["tool"], tool_call["arguments"]
)
if isinstance(result, dict) and "progress" in result:
progress = result["progress"]
total = result["total"]
percentage = (progress / total) * 100
logging.info(
f"Progress: {progress}/{total} "
f"({percentage:.1f}%)"
)
return f"Tool execution result: {result}"
except Exception as e:
error_msg = f"Error executing tool: {str(e)}"
logging.error(error_msg)
return error_msg
return f"No server found with tool: {tool_call['tool']}"
return llm_response
except json.JSONDecodeError:
return llm_response
async def start(self) -> None:
"""Main chat session handler."""
try:
for server in self.servers:
try:
await server.initialize()
except Exception as e:
logging.error(f"Failed to initialize server: {e}")
await self.cleanup_servers()
return
all_tools = []
for server in self.servers:
tools = await server.list_tools()
all_tools.extend(tools)
tools_description = "\n".join([tool.format_for_llm() for tool in all_tools])
system_message = (
"You are a helpful assistant with access to these tools:\n\n"
f"{tools_description}\n"
"Choose the appropriate tool based on the user's question. "
"If no tool is needed, reply directly.\n\n"
"IMPORTANT: When you need to use a tool, you must ONLY respond with "
"the exact JSON object format below, nothing else:\n"
"{\n"
' "tool": "tool-name",\n'
' "arguments": {\n'
' "argument-name": "value"\n'
" }\n"
"}\n\n"
"After receiving a tool's response:\n"
"1. Transform the raw data into a natural, conversational response\n"
"2. Keep responses concise but informative\n"
"3. Focus on the most relevant information\n"
"4. Use appropriate context from the user's question\n"
"5. Avoid simply repeating the raw data\n\n"
"Please use only the tools that are explicitly defined above."
)
messages = [{"role": "system", "content": system_message}]
while True:
try:
user_input = input("You: ").strip().lower()
if user_input in ["quit", "exit"]:
logging.info("\nExiting...")
break
messages.append({"role": "user", "content": user_input})
llm_response = self.llm_client.get_response(messages)
logging.info("\nAssistant: %s", llm_response)
result = await self.process_llm_response(llm_response)
if result != llm_response:
messages.append({"role": "assistant", "content": llm_response})
messages.append({"role": "system", "content": result})
final_response = self.llm_client.get_response(messages)
logging.info("\nFinal response: %s", final_response)
messages.append(
{"role": "assistant", "content": final_response}
)
else:
messages.append({"role": "assistant", "content": llm_response})
except KeyboardInterrupt:
logging.info("\nExiting...")
break
finally:
await self.cleanup_servers()
async def main() -> None:
"""Initialize and run the chat session."""
config = Configuration()
server_config = config.load_config("servers_config.json")
servers = [
Server(name, srv_config)
for name, srv_config in server_config["mcpServers"].items()
]
llm_client = LLMClient(config.llm_api_key)
chat_session = ChatSession(servers, llm_client)
await chat_session.start()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,4 @@
python-dotenv>=1.0.0
requests>=2.31.0
mcp>=1.0.0
uvicorn>=0.32.1

View File

@@ -0,0 +1,12 @@
{
"mcpServers": {
"sqlite": {
"command": "uvx",
"args": ["mcp-server-sqlite", "--db-path", "./test.db"]
},
"puppeteer": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-puppeteer"]
}
}
}

View File

@@ -0,0 +1,48 @@
[project]
name = "mcp-simple-chatbot"
version = "0.1.0"
description = "A simple CLI chatbot using the Model Context Protocol (MCP)"
readme = "README.md"
requires-python = ">=3.10"
authors = [{ name = "Edoardo Cilia" }]
keywords = ["mcp", "llm", "chatbot", "cli"]
license = { text = "MIT" }
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
]
dependencies = [
"python-dotenv>=1.0.0",
"requests>=2.31.0",
"mcp>=1.0.0",
"uvicorn>=0.32.1"
]
[project.scripts]
mcp-simple-chatbot = "mcp_simple_chatbot.client:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["mcp_simple_chatbot"]
[tool.pyright]
include = ["mcp_simple_chatbot"]
venvPath = "."
venv = ".venv"
[tool.ruff.lint]
select = ["E", "F", "I"]
ignore = []
[tool.ruff]
line-length = 88
target-version = "py310"
[tool.uv]
dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"]