Integrate FastMCP

This commit integrates FastMCP, a high-level MCP server implementation originally written by Jeremiah Lowin,
into the official MCP SDK. It also updates dependencies and adds new dev dependencies.
It moves the existing SDK into a .lowlevel .
This commit is contained in:
David Soria Parra
2024-12-09 16:16:47 +00:00
parent e98291e001
commit 557e90d2e7
41 changed files with 4875 additions and 521 deletions

View File

View File

View File

@@ -0,0 +1,194 @@
from pydantic import FileUrl
import pytest
from mcp.server.fastmcp.prompts.base import (
Prompt,
UserMessage,
TextContent,
AssistantMessage,
Message,
)
from mcp.types import EmbeddedResource, TextResourceContents
class TestRenderPrompt:
async def test_basic_fn(self):
def fn() -> str:
return "Hello, world!"
prompt = Prompt.from_function(fn)
assert await prompt.render() == [
UserMessage(content=TextContent(type="text", text="Hello, world!"))
]
async def test_async_fn(self):
async def fn() -> str:
return "Hello, world!"
prompt = Prompt.from_function(fn)
assert await prompt.render() == [
UserMessage(content=TextContent(type="text", text="Hello, world!"))
]
async def test_fn_with_args(self):
async def fn(name: str, age: int = 30) -> str:
return f"Hello, {name}! You're {age} years old."
prompt = Prompt.from_function(fn)
assert await prompt.render(arguments=dict(name="World")) == [
UserMessage(
content=TextContent(
type="text", text="Hello, World! You're 30 years old."
)
)
]
async def test_fn_with_invalid_kwargs(self):
async def fn(name: str, age: int = 30) -> str:
return f"Hello, {name}! You're {age} years old."
prompt = Prompt.from_function(fn)
with pytest.raises(ValueError):
await prompt.render(arguments=dict(age=40))
async def test_fn_returns_message(self):
async def fn() -> UserMessage:
return UserMessage(content="Hello, world!")
prompt = Prompt.from_function(fn)
assert await prompt.render() == [
UserMessage(content=TextContent(type="text", text="Hello, world!"))
]
async def test_fn_returns_assistant_message(self):
async def fn() -> AssistantMessage:
return AssistantMessage(
content=TextContent(type="text", text="Hello, world!")
)
prompt = Prompt.from_function(fn)
assert await prompt.render() == [
AssistantMessage(content=TextContent(type="text", text="Hello, world!"))
]
async def test_fn_returns_multiple_messages(self):
expected = [
UserMessage("Hello, world!"),
AssistantMessage("How can I help you today?"),
UserMessage("I'm looking for a restaurant in the center of town."),
]
async def fn() -> list[Message]:
return expected
prompt = Prompt.from_function(fn)
assert await prompt.render() == expected
async def test_fn_returns_list_of_strings(self):
expected = [
"Hello, world!",
"I'm looking for a restaurant in the center of town.",
]
async def fn() -> list[str]:
return expected
prompt = Prompt.from_function(fn)
assert await prompt.render() == [UserMessage(t) for t in expected]
async def test_fn_returns_resource_content(self):
"""Test returning a message with resource content."""
async def fn() -> UserMessage:
return UserMessage(
content=EmbeddedResource(
type="resource",
resource=TextResourceContents(
uri=FileUrl("file://file.txt"),
text="File contents",
mimeType="text/plain",
),
)
)
prompt = Prompt.from_function(fn)
assert await prompt.render() == [
UserMessage(
content=EmbeddedResource(
type="resource",
resource=TextResourceContents(
uri=FileUrl("file://file.txt"),
text="File contents",
mimeType="text/plain",
),
)
)
]
async def test_fn_returns_mixed_content(self):
"""Test returning messages with mixed content types."""
async def fn() -> list[Message]:
return [
UserMessage(content="Please analyze this file:"),
UserMessage(
content=EmbeddedResource(
type="resource",
resource=TextResourceContents(
uri=FileUrl("file://file.txt"),
text="File contents",
mimeType="text/plain",
),
)
),
AssistantMessage(content="I'll help analyze that file."),
]
prompt = Prompt.from_function(fn)
assert await prompt.render() == [
UserMessage(
content=TextContent(type="text", text="Please analyze this file:")
),
UserMessage(
content=EmbeddedResource(
type="resource",
resource=TextResourceContents(
uri=FileUrl("file://file.txt"),
text="File contents",
mimeType="text/plain",
),
)
),
AssistantMessage(
content=TextContent(type="text", text="I'll help analyze that file.")
),
]
async def test_fn_returns_dict_with_resource(self):
"""Test returning a dict with resource content."""
async def fn() -> dict:
return {
"role": "user",
"content": {
"type": "resource",
"resource": {
"uri": FileUrl("file://file.txt"),
"text": "File contents",
"mimeType": "text/plain",
},
},
}
prompt = Prompt.from_function(fn)
assert await prompt.render() == [
UserMessage(
content=EmbeddedResource(
type="resource",
resource=TextResourceContents(
uri=FileUrl("file://file.txt"),
text="File contents",
mimeType="text/plain",
),
)
)
]

View File

@@ -0,0 +1,107 @@
import pytest
from mcp.server.fastmcp.prompts.base import UserMessage, TextContent, Prompt
from mcp.server.fastmcp.prompts.manager import PromptManager
class TestPromptManager:
def test_add_prompt(self):
"""Test adding a prompt to the manager."""
def fn() -> str:
return "Hello, world!"
manager = PromptManager()
prompt = Prompt.from_function(fn)
added = manager.add_prompt(prompt)
assert added == prompt
assert manager.get_prompt("fn") == prompt
def test_add_duplicate_prompt(self, caplog):
"""Test adding the same prompt twice."""
def fn() -> str:
return "Hello, world!"
manager = PromptManager()
prompt = Prompt.from_function(fn)
first = manager.add_prompt(prompt)
second = manager.add_prompt(prompt)
assert first == second
assert "Prompt already exists" in caplog.text
def test_disable_warn_on_duplicate_prompts(self, caplog):
"""Test disabling warning on duplicate prompts."""
def fn() -> str:
return "Hello, world!"
manager = PromptManager(warn_on_duplicate_prompts=False)
prompt = Prompt.from_function(fn)
first = manager.add_prompt(prompt)
second = manager.add_prompt(prompt)
assert first == second
assert "Prompt already exists" not in caplog.text
def test_list_prompts(self):
"""Test listing all prompts."""
def fn1() -> str:
return "Hello, world!"
def fn2() -> str:
return "Goodbye, world!"
manager = PromptManager()
prompt1 = Prompt.from_function(fn1)
prompt2 = Prompt.from_function(fn2)
manager.add_prompt(prompt1)
manager.add_prompt(prompt2)
prompts = manager.list_prompts()
assert len(prompts) == 2
assert prompts == [prompt1, prompt2]
async def test_render_prompt(self):
"""Test rendering a prompt."""
def fn() -> str:
return "Hello, world!"
manager = PromptManager()
prompt = Prompt.from_function(fn)
manager.add_prompt(prompt)
messages = await manager.render_prompt("fn")
assert messages == [
UserMessage(content=TextContent(type="text", text="Hello, world!"))
]
async def test_render_prompt_with_args(self):
"""Test rendering a prompt with arguments."""
def fn(name: str) -> str:
return f"Hello, {name}!"
manager = PromptManager()
prompt = Prompt.from_function(fn)
manager.add_prompt(prompt)
messages = await manager.render_prompt("fn", arguments={"name": "World"})
assert messages == [
UserMessage(content=TextContent(type="text", text="Hello, World!"))
]
async def test_render_unknown_prompt(self):
"""Test rendering a non-existent prompt."""
manager = PromptManager()
with pytest.raises(ValueError, match="Unknown prompt: unknown"):
await manager.render_prompt("unknown")
async def test_render_prompt_with_missing_args(self):
"""Test rendering a prompt with missing required arguments."""
def fn(name: str) -> str:
return f"Hello, {name}!"
manager = PromptManager()
prompt = Prompt.from_function(fn)
manager.add_prompt(prompt)
with pytest.raises(ValueError, match="Missing required arguments"):
await manager.render_prompt("fn")

View File

@@ -0,0 +1,115 @@
import os
import pytest
from pathlib import Path
from tempfile import NamedTemporaryFile
from pydantic import FileUrl
from mcp.server.fastmcp.resources import FileResource
@pytest.fixture
def temp_file():
"""Create a temporary file for testing.
File is automatically cleaned up after the test if it still exists.
"""
content = "test content"
with NamedTemporaryFile(mode="w", delete=False) as f:
f.write(content)
path = Path(f.name).resolve()
yield path
try:
path.unlink()
except FileNotFoundError:
pass # File was already deleted by the test
class TestFileResource:
"""Test FileResource functionality."""
def test_file_resource_creation(self, temp_file: Path):
"""Test creating a FileResource."""
resource = FileResource(
uri=FileUrl(temp_file.as_uri()),
name="test",
description="test file",
path=temp_file,
)
assert str(resource.uri) == temp_file.as_uri()
assert resource.name == "test"
assert resource.description == "test file"
assert resource.mime_type == "text/plain" # default
assert resource.path == temp_file
assert resource.is_binary is False # default
def test_file_resource_str_path_conversion(self, temp_file: Path):
"""Test FileResource handles string paths."""
resource = FileResource(
uri=FileUrl(f"file://{temp_file}"),
name="test",
path=Path(str(temp_file)),
)
assert isinstance(resource.path, Path)
assert resource.path.is_absolute()
async def test_read_text_file(self, temp_file: Path):
"""Test reading a text file."""
resource = FileResource(
uri=FileUrl(f"file://{temp_file}"),
name="test",
path=temp_file,
)
content = await resource.read()
assert content == "test content"
assert resource.mime_type == "text/plain"
async def test_read_binary_file(self, temp_file: Path):
"""Test reading a file as binary."""
resource = FileResource(
uri=FileUrl(f"file://{temp_file}"),
name="test",
path=temp_file,
is_binary=True,
)
content = await resource.read()
assert isinstance(content, bytes)
assert content == b"test content"
def test_relative_path_error(self):
"""Test error on relative path."""
with pytest.raises(ValueError, match="Path must be absolute"):
FileResource(
uri=FileUrl("file:///test.txt"),
name="test",
path=Path("test.txt"),
)
async def test_missing_file_error(self, temp_file: Path):
"""Test error when file doesn't exist."""
# Create path to non-existent file
missing = temp_file.parent / "missing.txt"
resource = FileResource(
uri=FileUrl("file:///missing.txt"),
name="test",
path=missing,
)
with pytest.raises(ValueError, match="Error reading file"):
await resource.read()
@pytest.mark.skipif(
os.name == "nt", reason="File permissions behave differently on Windows"
)
async def test_permission_error(self, temp_file: Path):
"""Test reading a file without permissions."""
temp_file.chmod(0o000) # Remove all permissions
try:
resource = FileResource(
uri=FileUrl(temp_file.as_uri()),
name="test",
path=temp_file,
)
with pytest.raises(ValueError, match="Error reading file"):
await resource.read()
finally:
temp_file.chmod(0o644) # Restore permissions

View File

@@ -0,0 +1,115 @@
from pydantic import BaseModel, AnyUrl
import pytest
from mcp.server.fastmcp.resources import FunctionResource
class TestFunctionResource:
"""Test FunctionResource functionality."""
def test_function_resource_creation(self):
"""Test creating a FunctionResource."""
def my_func() -> str:
return "test content"
resource = FunctionResource(
uri=AnyUrl("fn://test"),
name="test",
description="test function",
fn=my_func,
)
assert str(resource.uri) == "fn://test"
assert resource.name == "test"
assert resource.description == "test function"
assert resource.mime_type == "text/plain" # default
assert resource.fn == my_func
async def test_read_text(self):
"""Test reading text from a FunctionResource."""
def get_data() -> str:
return "Hello, world!"
resource = FunctionResource(
uri=AnyUrl("function://test"),
name="test",
fn=get_data,
)
content = await resource.read()
assert content == "Hello, world!"
assert resource.mime_type == "text/plain"
async def test_read_binary(self):
"""Test reading binary data from a FunctionResource."""
def get_data() -> bytes:
return b"Hello, world!"
resource = FunctionResource(
uri=AnyUrl("function://test"),
name="test",
fn=get_data,
)
content = await resource.read()
assert content == b"Hello, world!"
async def test_json_conversion(self):
"""Test automatic JSON conversion of non-string results."""
def get_data() -> dict:
return {"key": "value"}
resource = FunctionResource(
uri=AnyUrl("function://test"),
name="test",
fn=get_data,
)
content = await resource.read()
assert isinstance(content, str)
assert '"key": "value"' in content
async def test_error_handling(self):
"""Test error handling in FunctionResource."""
def failing_func() -> str:
raise ValueError("Test error")
resource = FunctionResource(
uri=AnyUrl("function://test"),
name="test",
fn=failing_func,
)
with pytest.raises(ValueError, match="Error reading resource function://test"):
await resource.read()
async def test_basemodel_conversion(self):
"""Test handling of BaseModel types."""
class MyModel(BaseModel):
name: str
resource = FunctionResource(
uri=AnyUrl("function://test"),
name="test",
fn=lambda: MyModel(name="test"),
)
content = await resource.read()
assert content == '{"name": "test"}'
async def test_custom_type_conversion(self):
"""Test handling of custom types."""
class CustomData:
def __str__(self) -> str:
return "custom data"
def get_data() -> CustomData:
return CustomData()
resource = FunctionResource(
uri=AnyUrl("function://test"),
name="test",
fn=get_data,
)
content = await resource.read()
assert isinstance(content, str)

View File

@@ -0,0 +1,137 @@
import pytest
from pathlib import Path
from tempfile import NamedTemporaryFile
from pydantic import AnyUrl, FileUrl
from mcp.server.fastmcp.resources import (
FileResource,
FunctionResource,
ResourceManager,
ResourceTemplate,
)
@pytest.fixture
def temp_file():
"""Create a temporary file for testing.
File is automatically cleaned up after the test if it still exists.
"""
content = "test content"
with NamedTemporaryFile(mode="w", delete=False) as f:
f.write(content)
path = Path(f.name).resolve()
yield path
try:
path.unlink()
except FileNotFoundError:
pass # File was already deleted by the test
class TestResourceManager:
"""Test ResourceManager functionality."""
def test_add_resource(self, temp_file: Path):
"""Test adding a resource."""
manager = ResourceManager()
resource = FileResource(
uri=FileUrl(f"file://{temp_file}"),
name="test",
path=temp_file,
)
added = manager.add_resource(resource)
assert added == resource
assert manager.list_resources() == [resource]
def test_add_duplicate_resource(self, temp_file: Path):
"""Test adding the same resource twice."""
manager = ResourceManager()
resource = FileResource(
uri=FileUrl(f"file://{temp_file}"),
name="test",
path=temp_file,
)
first = manager.add_resource(resource)
second = manager.add_resource(resource)
assert first == second
assert manager.list_resources() == [resource]
def test_warn_on_duplicate_resources(self, temp_file: Path, caplog):
"""Test warning on duplicate resources."""
manager = ResourceManager()
resource = FileResource(
uri=FileUrl(f"file://{temp_file}"),
name="test",
path=temp_file,
)
manager.add_resource(resource)
manager.add_resource(resource)
assert "Resource already exists" in caplog.text
def test_disable_warn_on_duplicate_resources(self, temp_file: Path, caplog):
"""Test disabling warning on duplicate resources."""
manager = ResourceManager(warn_on_duplicate_resources=False)
resource = FileResource(
uri=FileUrl(f"file://{temp_file}"),
name="test",
path=temp_file,
)
manager.add_resource(resource)
manager.add_resource(resource)
assert "Resource already exists" not in caplog.text
async def test_get_resource(self, temp_file: Path):
"""Test getting a resource by URI."""
manager = ResourceManager()
resource = FileResource(
uri=FileUrl(f"file://{temp_file}"),
name="test",
path=temp_file,
)
manager.add_resource(resource)
retrieved = await manager.get_resource(resource.uri)
assert retrieved == resource
async def test_get_resource_from_template(self):
"""Test getting a resource through a template."""
manager = ResourceManager()
def greet(name: str) -> str:
return f"Hello, {name}!"
template = ResourceTemplate.from_function(
fn=greet,
uri_template="greet://{name}",
name="greeter",
)
manager._templates[template.uri_template] = template
resource = await manager.get_resource(AnyUrl("greet://world"))
assert isinstance(resource, FunctionResource)
content = await resource.read()
assert content == "Hello, world!"
async def test_get_unknown_resource(self):
"""Test getting a non-existent resource."""
manager = ResourceManager()
with pytest.raises(ValueError, match="Unknown resource"):
await manager.get_resource(AnyUrl("unknown://test"))
def test_list_resources(self, temp_file: Path):
"""Test listing all resources."""
manager = ResourceManager()
resource1 = FileResource(
uri=FileUrl(f"file://{temp_file}"),
name="test1",
path=temp_file,
)
resource2 = FileResource(
uri=FileUrl(f"file://{temp_file}2"),
name="test2",
path=temp_file,
)
manager.add_resource(resource1)
manager.add_resource(resource2)
resources = manager.list_resources()
assert len(resources) == 2
assert resources == [resource1, resource2]

View File

@@ -0,0 +1,181 @@
import json
import pytest
from pydantic import BaseModel
from mcp.server.fastmcp.resources import FunctionResource, ResourceTemplate
class TestResourceTemplate:
"""Test ResourceTemplate functionality."""
def test_template_creation(self):
"""Test creating a template from a function."""
def my_func(key: str, value: int) -> dict:
return {"key": key, "value": value}
template = ResourceTemplate.from_function(
fn=my_func,
uri_template="test://{key}/{value}",
name="test",
)
assert template.uri_template == "test://{key}/{value}"
assert template.name == "test"
assert template.mime_type == "text/plain" # default
test_input = {"key": "test", "value": 42}
assert template.fn(**test_input) == my_func(**test_input)
def test_template_matches(self):
"""Test matching URIs against a template."""
def my_func(key: str, value: int) -> dict:
return {"key": key, "value": value}
template = ResourceTemplate.from_function(
fn=my_func,
uri_template="test://{key}/{value}",
name="test",
)
# Valid match
params = template.matches("test://foo/123")
assert params == {"key": "foo", "value": "123"}
# No match
assert template.matches("test://foo") is None
assert template.matches("other://foo/123") is None
async def test_create_resource(self):
"""Test creating a resource from a template."""
def my_func(key: str, value: int) -> dict:
return {"key": key, "value": value}
template = ResourceTemplate.from_function(
fn=my_func,
uri_template="test://{key}/{value}",
name="test",
)
resource = await template.create_resource(
"test://foo/123",
{"key": "foo", "value": 123},
)
assert isinstance(resource, FunctionResource)
content = await resource.read()
assert isinstance(content, str)
data = json.loads(content)
assert data == {"key": "foo", "value": 123}
async def test_template_error(self):
"""Test error handling in template resource creation."""
def failing_func(x: str) -> str:
raise ValueError("Test error")
template = ResourceTemplate.from_function(
fn=failing_func,
uri_template="fail://{x}",
name="fail",
)
with pytest.raises(ValueError, match="Error creating resource from template"):
await template.create_resource("fail://test", {"x": "test"})
async def test_async_text_resource(self):
"""Test creating a text resource from async function."""
async def greet(name: str) -> str:
return f"Hello, {name}!"
template = ResourceTemplate.from_function(
fn=greet,
uri_template="greet://{name}",
name="greeter",
)
resource = await template.create_resource(
"greet://world",
{"name": "world"},
)
assert isinstance(resource, FunctionResource)
content = await resource.read()
assert content == "Hello, world!"
async def test_async_binary_resource(self):
"""Test creating a binary resource from async function."""
async def get_bytes(value: str) -> bytes:
return value.encode()
template = ResourceTemplate.from_function(
fn=get_bytes,
uri_template="bytes://{value}",
name="bytes",
)
resource = await template.create_resource(
"bytes://test",
{"value": "test"},
)
assert isinstance(resource, FunctionResource)
content = await resource.read()
assert content == b"test"
async def test_basemodel_conversion(self):
"""Test handling of BaseModel types."""
class MyModel(BaseModel):
key: str
value: int
def get_data(key: str, value: int) -> MyModel:
return MyModel(key=key, value=value)
template = ResourceTemplate.from_function(
fn=get_data,
uri_template="test://{key}/{value}",
name="test",
)
resource = await template.create_resource(
"test://foo/123",
{"key": "foo", "value": 123},
)
assert isinstance(resource, FunctionResource)
content = await resource.read()
assert isinstance(content, str)
data = json.loads(content)
assert data == {"key": "foo", "value": 123}
async def test_custom_type_conversion(self):
"""Test handling of custom types."""
class CustomData:
def __init__(self, value: str):
self.value = value
def __str__(self) -> str:
return self.value
def get_data(value: str) -> CustomData:
return CustomData(value)
template = ResourceTemplate.from_function(
fn=get_data,
uri_template="test://{value}",
name="test",
)
resource = await template.create_resource(
"test://hello",
{"value": "hello"},
)
assert isinstance(resource, FunctionResource)
content = await resource.read()
assert content == "hello"

View File

@@ -0,0 +1,100 @@
import pytest
from pydantic import AnyUrl
from mcp.server.fastmcp.resources import FunctionResource, Resource
class TestResourceValidation:
"""Test base Resource validation."""
def test_resource_uri_validation(self):
"""Test URI validation."""
def dummy_func() -> str:
return "data"
# Valid URI
resource = FunctionResource(
uri=AnyUrl("http://example.com/data"),
name="test",
fn=dummy_func,
)
assert str(resource.uri) == "http://example.com/data"
# Missing protocol
with pytest.raises(ValueError, match="Input should be a valid URL"):
FunctionResource(
uri=AnyUrl("invalid"),
name="test",
fn=dummy_func,
)
# Missing host
with pytest.raises(ValueError, match="Input should be a valid URL"):
FunctionResource(
uri=AnyUrl("http://"),
name="test",
fn=dummy_func,
)
def test_resource_name_from_uri(self):
"""Test name is extracted from URI if not provided."""
def dummy_func() -> str:
return "data"
resource = FunctionResource(
uri=AnyUrl("resource://my-resource"),
fn=dummy_func,
)
assert resource.name == "resource://my-resource"
def test_resource_name_validation(self):
"""Test name validation."""
def dummy_func() -> str:
return "data"
# Must provide either name or URI
with pytest.raises(ValueError, match="Either name or uri must be provided"):
FunctionResource(
fn=dummy_func,
)
# Explicit name takes precedence over URI
resource = FunctionResource(
uri=AnyUrl("resource://uri-name"),
name="explicit-name",
fn=dummy_func,
)
assert resource.name == "explicit-name"
def test_resource_mime_type(self):
"""Test mime type handling."""
def dummy_func() -> str:
return "data"
# Default mime type
resource = FunctionResource(
uri=AnyUrl("resource://test"),
fn=dummy_func,
)
assert resource.mime_type == "text/plain"
# Custom mime type
resource = FunctionResource(
uri=AnyUrl("resource://test"),
fn=dummy_func,
mime_type="application/json",
)
assert resource.mime_type == "application/json"
async def test_resource_read_abstract(self):
"""Test that Resource.read() is abstract."""
class ConcreteResource(Resource):
pass
with pytest.raises(TypeError, match="abstract method"):
ConcreteResource(uri=AnyUrl("test://test"), name="test") # type: ignore

View File

View File

@@ -0,0 +1,114 @@
import json
from mcp.server.fastmcp import FastMCP
import pytest
from pathlib import Path
@pytest.fixture()
def test_dir(tmp_path_factory) -> Path:
"""Create a temporary directory with test files."""
tmp = tmp_path_factory.mktemp("test_files")
# Create test files
(tmp / "example.py").write_text("print('hello world')")
(tmp / "readme.md").write_text("# Test Directory\nThis is a test.")
(tmp / "config.json").write_text('{"test": true}')
return tmp
@pytest.fixture
def mcp() -> FastMCP:
mcp = FastMCP()
return mcp
@pytest.fixture(autouse=True)
def resources(mcp: FastMCP, test_dir: Path) -> FastMCP:
@mcp.resource("dir://test_dir")
def list_test_dir() -> list[str]:
"""List the files in the test directory"""
return [str(f) for f in test_dir.iterdir()]
@mcp.resource("file://test_dir/example.py")
def read_example_py() -> str:
"""Read the example.py file"""
try:
return (test_dir / "example.py").read_text()
except FileNotFoundError:
return "File not found"
@mcp.resource("file://test_dir/readme.md")
def read_readme_md() -> str:
"""Read the readme.md file"""
try:
return (test_dir / "readme.md").read_text()
except FileNotFoundError:
return "File not found"
@mcp.resource("file://test_dir/config.json")
def read_config_json() -> str:
"""Read the config.json file"""
try:
return (test_dir / "config.json").read_text()
except FileNotFoundError:
return "File not found"
return mcp
@pytest.fixture(autouse=True)
def tools(mcp: FastMCP, test_dir: Path) -> FastMCP:
@mcp.tool()
def delete_file(path: str) -> bool:
# ensure path is in test_dir
if Path(path).resolve().parent != test_dir:
raise ValueError(f"Path must be in test_dir: {path}")
Path(path).unlink()
return True
return mcp
async def test_list_resources(mcp: FastMCP):
resources = await mcp.list_resources()
assert len(resources) == 4
assert [str(r.uri) for r in resources] == [
"dir://test_dir",
"file://test_dir/example.py",
"file://test_dir/readme.md",
"file://test_dir/config.json",
]
async def test_read_resource_dir(mcp: FastMCP):
files = await mcp.read_resource("dir://test_dir")
files = json.loads(files)
assert sorted([Path(f).name for f in files]) == [
"config.json",
"example.py",
"readme.md",
]
async def test_read_resource_file(mcp: FastMCP):
result = await mcp.read_resource("file://test_dir/example.py")
assert result == "print('hello world')"
async def test_delete_file(mcp: FastMCP, test_dir: Path):
await mcp.call_tool(
"delete_file", arguments=dict(path=str(test_dir / "example.py"))
)
assert not (test_dir / "example.py").exists()
async def test_delete_file_and_check_resources(mcp: FastMCP, test_dir: Path):
await mcp.call_tool(
"delete_file", arguments=dict(path=str(test_dir / "example.py"))
)
result = await mcp.read_resource("file://test_dir/example.py")
assert result == "File not found"

View File

@@ -0,0 +1,361 @@
from typing import Annotated
import annotated_types
import pytest
from pydantic import BaseModel, Field
from mcp.server.fastmcp.utilities.func_metadata import func_metadata
class SomeInputModelA(BaseModel):
pass
class SomeInputModelB(BaseModel):
class InnerModel(BaseModel):
x: int
how_many_shrimp: Annotated[int, Field(description="How many shrimp in the tank???")]
ok: InnerModel
y: None
def complex_arguments_fn(
an_int: int,
must_be_none: None,
must_be_none_dumb_annotation: Annotated[None, "blah"],
list_of_ints: list[int],
# list[str] | str is an interesting case because if it comes in as JSON like
# "[\"a\", \"b\"]" then it will be naively parsed as a string.
list_str_or_str: list[str] | str,
an_int_annotated_with_field: Annotated[
int, Field(description="An int with a field")
],
an_int_annotated_with_field_and_others: Annotated[
int,
str, # Should be ignored, really
Field(description="An int with a field"),
annotated_types.Gt(1),
],
an_int_annotated_with_junk: Annotated[
int,
"123",
456,
],
field_with_default_via_field_annotation_before_nondefault_arg: Annotated[
int, Field(1)
],
unannotated,
my_model_a: SomeInputModelA,
my_model_a_forward_ref: "SomeInputModelA",
my_model_b: SomeInputModelB,
an_int_annotated_with_field_default: Annotated[
int,
Field(1, description="An int with a field"),
],
unannotated_with_default=5,
my_model_a_with_default: SomeInputModelA = SomeInputModelA(), # noqa: B008
an_int_with_default: int = 1,
must_be_none_with_default: None = None,
an_int_with_equals_field: int = Field(1, ge=0),
int_annotated_with_default: Annotated[int, Field(description="hey")] = 5,
) -> str:
_ = (
an_int,
must_be_none,
must_be_none_dumb_annotation,
list_of_ints,
list_str_or_str,
an_int_annotated_with_field,
an_int_annotated_with_field_and_others,
an_int_annotated_with_junk,
field_with_default_via_field_annotation_before_nondefault_arg,
unannotated,
an_int_annotated_with_field_default,
unannotated_with_default,
my_model_a,
my_model_a_forward_ref,
my_model_b,
my_model_a_with_default,
an_int_with_default,
must_be_none_with_default,
an_int_with_equals_field,
int_annotated_with_default,
)
return "ok!"
async def test_complex_function_runtime_arg_validation_non_json():
"""Test that basic non-JSON arguments are validated correctly"""
meta = func_metadata(complex_arguments_fn)
# Test with minimum required arguments
result = await meta.call_fn_with_arg_validation(
complex_arguments_fn,
fn_is_async=False,
arguments_to_validate={
"an_int": 1,
"must_be_none": None,
"must_be_none_dumb_annotation": None,
"list_of_ints": [1, 2, 3],
"list_str_or_str": "hello",
"an_int_annotated_with_field": 42,
"an_int_annotated_with_field_and_others": 5,
"an_int_annotated_with_junk": 100,
"unannotated": "test",
"my_model_a": {},
"my_model_a_forward_ref": {},
"my_model_b": {"how_many_shrimp": 5, "ok": {"x": 1}, "y": None},
},
arguments_to_pass_directly=None,
)
assert result == "ok!"
# Test with invalid types
with pytest.raises(ValueError):
await meta.call_fn_with_arg_validation(
complex_arguments_fn,
fn_is_async=False,
arguments_to_validate={"an_int": "not an int"},
arguments_to_pass_directly=None,
)
async def test_complex_function_runtime_arg_validation_with_json():
"""Test that JSON string arguments are parsed and validated correctly"""
meta = func_metadata(complex_arguments_fn)
result = await meta.call_fn_with_arg_validation(
complex_arguments_fn,
fn_is_async=False,
arguments_to_validate={
"an_int": 1,
"must_be_none": None,
"must_be_none_dumb_annotation": None,
"list_of_ints": "[1, 2, 3]", # JSON string
"list_str_or_str": '["a", "b", "c"]', # JSON string
"an_int_annotated_with_field": 42,
"an_int_annotated_with_field_and_others": "5", # JSON string
"an_int_annotated_with_junk": 100,
"unannotated": "test",
"my_model_a": "{}", # JSON string
"my_model_a_forward_ref": "{}", # JSON string
"my_model_b": '{"how_many_shrimp": 5, "ok": {"x": 1}, "y": null}', # JSON string
},
arguments_to_pass_directly=None,
)
assert result == "ok!"
def test_str_vs_list_str():
"""Test handling of string vs list[str] type annotations.
This is tricky as '"hello"' can be parsed as a JSON string or a Python string.
We want to make sure it's kept as a python string.
"""
def func_with_str_types(str_or_list: str | list[str]):
return str_or_list
meta = func_metadata(func_with_str_types)
# Test string input for union type
result = meta.pre_parse_json({"str_or_list": "hello"})
assert result["str_or_list"] == "hello"
# Test string input that contains valid JSON for union type
# We want to see here that the JSON-vali string is NOT parsed as JSON, but rather
# kept as a raw string
result = meta.pre_parse_json({"str_or_list": '"hello"'})
assert result["str_or_list"] == '"hello"'
# Test list input for union type
result = meta.pre_parse_json({"str_or_list": '["hello", "world"]'})
assert result["str_or_list"] == ["hello", "world"]
def test_skip_names():
"""Test that skipped parameters are not included in the model"""
def func_with_many_params(
keep_this: int, skip_this: str, also_keep: float, also_skip: bool
):
return keep_this, skip_this, also_keep, also_skip
# Skip some parameters
meta = func_metadata(func_with_many_params, skip_names=["skip_this", "also_skip"])
# Check model fields
assert "keep_this" in meta.arg_model.model_fields
assert "also_keep" in meta.arg_model.model_fields
assert "skip_this" not in meta.arg_model.model_fields
assert "also_skip" not in meta.arg_model.model_fields
# Validate that we can call with only non-skipped parameters
model: BaseModel = meta.arg_model.model_validate({"keep_this": 1, "also_keep": 2.5}) # type: ignore
assert model.keep_this == 1 # type: ignore
assert model.also_keep == 2.5 # type: ignore
async def test_lambda_function():
"""Test lambda function schema and validation"""
fn = lambda x, y=5: x # noqa: E731
meta = func_metadata(lambda x, y=5: x)
# Test schema
assert meta.arg_model.model_json_schema() == {
"properties": {
"x": {"title": "x", "type": "string"},
"y": {"default": 5, "title": "y", "type": "string"},
},
"required": ["x"],
"title": "<lambda>Arguments",
"type": "object",
}
async def check_call(args):
return await meta.call_fn_with_arg_validation(
fn,
fn_is_async=False,
arguments_to_validate=args,
arguments_to_pass_directly=None,
)
# Basic calls
assert await check_call({"x": "hello"}) == "hello"
assert await check_call({"x": "hello", "y": "world"}) == "hello"
assert await check_call({"x": '"hello"'}) == '"hello"'
# Missing required arg
with pytest.raises(ValueError):
await check_call({"y": "world"})
def test_complex_function_json_schema():
meta = func_metadata(complex_arguments_fn)
assert meta.arg_model.model_json_schema() == {
"$defs": {
"InnerModel": {
"properties": {"x": {"title": "X", "type": "integer"}},
"required": ["x"],
"title": "InnerModel",
"type": "object",
},
"SomeInputModelA": {
"properties": {},
"title": "SomeInputModelA",
"type": "object",
},
"SomeInputModelB": {
"properties": {
"how_many_shrimp": {
"description": "How many shrimp in the tank???",
"title": "How Many Shrimp",
"type": "integer",
},
"ok": {"$ref": "#/$defs/InnerModel"},
"y": {"title": "Y", "type": "null"},
},
"required": ["how_many_shrimp", "ok", "y"],
"title": "SomeInputModelB",
"type": "object",
},
},
"properties": {
"an_int": {"title": "An Int", "type": "integer"},
"must_be_none": {"title": "Must Be None", "type": "null"},
"must_be_none_dumb_annotation": {
"title": "Must Be None Dumb Annotation",
"type": "null",
},
"list_of_ints": {
"items": {"type": "integer"},
"title": "List Of Ints",
"type": "array",
},
"list_str_or_str": {
"anyOf": [
{"items": {"type": "string"}, "type": "array"},
{"type": "string"},
],
"title": "List Str Or Str",
},
"an_int_annotated_with_field": {
"description": "An int with a field",
"title": "An Int Annotated With Field",
"type": "integer",
},
"an_int_annotated_with_field_and_others": {
"description": "An int with a field",
"exclusiveMinimum": 1,
"title": "An Int Annotated With Field And Others",
"type": "integer",
},
"an_int_annotated_with_junk": {
"title": "An Int Annotated With Junk",
"type": "integer",
},
"field_with_default_via_field_annotation_before_nondefault_arg": {
"default": 1,
"title": "Field With Default Via Field Annotation Before Nondefault Arg",
"type": "integer",
},
"unannotated": {"title": "unannotated", "type": "string"},
"my_model_a": {"$ref": "#/$defs/SomeInputModelA"},
"my_model_a_forward_ref": {"$ref": "#/$defs/SomeInputModelA"},
"my_model_b": {"$ref": "#/$defs/SomeInputModelB"},
"an_int_annotated_with_field_default": {
"default": 1,
"description": "An int with a field",
"title": "An Int Annotated With Field Default",
"type": "integer",
},
"unannotated_with_default": {
"default": 5,
"title": "unannotated_with_default",
"type": "string",
},
"my_model_a_with_default": {
"$ref": "#/$defs/SomeInputModelA",
"default": {},
},
"an_int_with_default": {
"default": 1,
"title": "An Int With Default",
"type": "integer",
},
"must_be_none_with_default": {
"default": None,
"title": "Must Be None With Default",
"type": "null",
},
"an_int_with_equals_field": {
"default": 1,
"minimum": 0,
"title": "An Int With Equals Field",
"type": "integer",
},
"int_annotated_with_default": {
"default": 5,
"description": "hey",
"title": "Int Annotated With Default",
"type": "integer",
},
},
"required": [
"an_int",
"must_be_none",
"must_be_none_dumb_annotation",
"list_of_ints",
"list_str_or_str",
"an_int_annotated_with_field",
"an_int_annotated_with_field_and_others",
"an_int_annotated_with_junk",
"unannotated",
"my_model_a",
"my_model_a_forward_ref",
"my_model_b",
],
"title": "complex_arguments_fnArguments",
"type": "object",
}

View File

@@ -0,0 +1,656 @@
import base64
from pathlib import Path
from typing import TYPE_CHECKING, Union
import pytest
from mcp.shared.exceptions import McpError
from mcp.shared.memory import (
create_connected_server_and_client_session as client_session,
)
from mcp.types import (
ImageContent,
TextContent,
TextResourceContents,
BlobResourceContents,
)
from pydantic import AnyUrl
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.fastmcp.prompts.base import EmbeddedResource, Message, UserMessage
from mcp.server.fastmcp.resources import FileResource, FunctionResource
from mcp.server.fastmcp.utilities.types import Image
if TYPE_CHECKING:
from mcp.server.fastmcp import Context
class TestServer:
async def test_create_server(self):
mcp = FastMCP()
assert mcp.name == "FastMCP"
async def test_add_tool_decorator(self):
mcp = FastMCP()
@mcp.tool()
def add(x: int, y: int) -> int:
return x + y
assert len(mcp._tool_manager.list_tools()) == 1
async def test_add_tool_decorator_incorrect_usage(self):
mcp = FastMCP()
with pytest.raises(TypeError, match="The @tool decorator was used incorrectly"):
@mcp.tool # Missing parentheses #type: ignore
def add(x: int, y: int) -> int:
return x + y
async def test_add_resource_decorator(self):
mcp = FastMCP()
@mcp.resource("r://{x}")
def get_data(x: str) -> str:
return f"Data: {x}"
assert len(mcp._resource_manager._templates) == 1
async def test_add_resource_decorator_incorrect_usage(self):
mcp = FastMCP()
with pytest.raises(
TypeError, match="The @resource decorator was used incorrectly"
):
@mcp.resource # Missing parentheses #type: ignore
def get_data(x: str) -> str:
return f"Data: {x}"
def tool_fn(x: int, y: int) -> int:
return x + y
def error_tool_fn() -> None:
raise ValueError("Test error")
def image_tool_fn(path: str) -> Image:
return Image(path)
def mixed_content_tool_fn() -> list[Union[TextContent, ImageContent]]:
return [
TextContent(type="text", text="Hello"),
ImageContent(type="image", data="abc", mimeType="image/png"),
]
class TestServerTools:
async def test_add_tool(self):
mcp = FastMCP()
mcp.add_tool(tool_fn)
mcp.add_tool(tool_fn)
assert len(mcp._tool_manager.list_tools()) == 1
async def test_list_tools(self):
mcp = FastMCP()
mcp.add_tool(tool_fn)
async with client_session(mcp._mcp_server) as client:
tools = await client.list_tools()
assert len(tools.tools) == 1
async def test_call_tool(self):
mcp = FastMCP()
mcp.add_tool(tool_fn)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("my_tool", {"arg1": "value"})
assert not hasattr(result, "error")
assert len(result.content) > 0
async def test_tool_exception_handling(self):
mcp = FastMCP()
mcp.add_tool(error_tool_fn)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("error_tool_fn", {})
assert len(result.content) == 1
content = result.content[0]
assert isinstance(content, TextContent)
assert "Test error" in content.text
assert result.isError is True
async def test_tool_error_handling(self):
mcp = FastMCP()
mcp.add_tool(error_tool_fn)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("error_tool_fn", {})
assert len(result.content) == 1
content = result.content[0]
assert isinstance(content, TextContent)
assert "Test error" in content.text
assert result.isError is True
async def test_tool_error_details(self):
"""Test that exception details are properly formatted in the response"""
mcp = FastMCP()
mcp.add_tool(error_tool_fn)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("error_tool_fn", {})
content = result.content[0]
assert isinstance(content, TextContent)
assert isinstance(content.text, str)
assert "Test error" in content.text
assert result.isError is True
async def test_tool_return_value_conversion(self):
mcp = FastMCP()
mcp.add_tool(tool_fn)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("tool_fn", {"x": 1, "y": 2})
assert len(result.content) == 1
content = result.content[0]
assert isinstance(content, TextContent)
assert content.text == "3"
async def test_tool_image_helper(self, tmp_path: Path):
# Create a test image
image_path = tmp_path / "test.png"
image_path.write_bytes(b"fake png data")
mcp = FastMCP()
mcp.add_tool(image_tool_fn)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("image_tool_fn", {"path": str(image_path)})
assert len(result.content) == 1
content = result.content[0]
assert isinstance(content, ImageContent)
assert content.type == "image"
assert content.mimeType == "image/png"
# Verify base64 encoding
decoded = base64.b64decode(content.data)
assert decoded == b"fake png data"
async def test_tool_mixed_content(self):
mcp = FastMCP()
mcp.add_tool(mixed_content_tool_fn)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("mixed_content_tool_fn", {})
assert len(result.content) == 2
content1 = result.content[0]
content2 = result.content[1]
assert isinstance(content1, TextContent)
assert content1.text == "Hello"
assert isinstance(content2, ImageContent)
assert content2.mimeType == "image/png"
assert content2.data == "abc"
async def test_tool_mixed_list_with_image(self, tmp_path: Path):
"""Test that lists containing Image objects and other types are handled correctly"""
# Create a test image
image_path = tmp_path / "test.png"
image_path.write_bytes(b"test image data")
def mixed_list_fn() -> list:
return [
"text message",
Image(image_path),
{"key": "value"},
TextContent(type="text", text="direct content"),
]
mcp = FastMCP()
mcp.add_tool(mixed_list_fn)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("mixed_list_fn", {})
assert len(result.content) == 4
# Check text conversion
content1 = result.content[0]
assert isinstance(content1, TextContent)
assert content1.text == "text message"
# Check image conversion
content2 = result.content[1]
assert isinstance(content2, ImageContent)
assert content2.mimeType == "image/png"
assert base64.b64decode(content2.data) == b"test image data"
# Check dict conversion
content3 = result.content[2]
assert isinstance(content3, TextContent)
assert '"key": "value"' in content3.text
# Check direct TextContent
content4 = result.content[3]
assert isinstance(content4, TextContent)
assert content4.text == "direct content"
class TestServerResources:
async def test_text_resource(self):
mcp = FastMCP()
def get_text():
return "Hello, world!"
resource = FunctionResource(
uri=AnyUrl("resource://test"), name="test", fn=get_text
)
mcp.add_resource(resource)
async with client_session(mcp._mcp_server) as client:
result = await client.read_resource(AnyUrl("resource://test"))
assert isinstance(result.contents[0], TextResourceContents)
assert result.contents[0].text == "Hello, world!"
async def test_binary_resource(self):
mcp = FastMCP()
def get_binary():
return b"Binary data"
resource = FunctionResource(
uri=AnyUrl("resource://binary"),
name="binary",
fn=get_binary,
mime_type="application/octet-stream",
)
mcp.add_resource(resource)
async with client_session(mcp._mcp_server) as client:
result = await client.read_resource(AnyUrl("resource://binary"))
assert isinstance(result.contents[0], BlobResourceContents)
assert result.contents[0].blob == base64.b64encode(b"Binary data").decode()
async def test_file_resource_text(self, tmp_path: Path):
mcp = FastMCP()
# Create a text file
text_file = tmp_path / "test.txt"
text_file.write_text("Hello from file!")
resource = FileResource(
uri=AnyUrl("file://test.txt"), name="test.txt", path=text_file
)
mcp.add_resource(resource)
async with client_session(mcp._mcp_server) as client:
result = await client.read_resource(AnyUrl("file://test.txt"))
assert isinstance(result.contents[0], TextResourceContents)
assert result.contents[0].text == "Hello from file!"
async def test_file_resource_binary(self, tmp_path: Path):
mcp = FastMCP()
# Create a binary file
binary_file = tmp_path / "test.bin"
binary_file.write_bytes(b"Binary file data")
resource = FileResource(
uri=AnyUrl("file://test.bin"),
name="test.bin",
path=binary_file,
mime_type="application/octet-stream",
)
mcp.add_resource(resource)
async with client_session(mcp._mcp_server) as client:
result = await client.read_resource(AnyUrl("file://test.bin"))
assert isinstance(result.contents[0], BlobResourceContents)
assert (
result.contents[0].blob
== base64.b64encode(b"Binary file data").decode()
)
class TestServerResourceTemplates:
async def test_resource_with_params(self):
"""Test that a resource with function parameters raises an error if the URI
parameters don't match"""
mcp = FastMCP()
with pytest.raises(ValueError, match="Mismatch between URI parameters"):
@mcp.resource("resource://data")
def get_data_fn(param: str) -> str:
return f"Data: {param}"
async def test_resource_with_uri_params(self):
"""Test that a resource with URI parameters is automatically a template"""
mcp = FastMCP()
with pytest.raises(ValueError, match="Mismatch between URI parameters"):
@mcp.resource("resource://{param}")
def get_data() -> str:
return "Data"
async def test_resource_with_untyped_params(self):
"""Test that a resource with untyped parameters raises an error"""
mcp = FastMCP()
@mcp.resource("resource://{param}")
def get_data(param) -> str:
return "Data"
async def test_resource_matching_params(self):
"""Test that a resource with matching URI and function parameters works"""
mcp = FastMCP()
@mcp.resource("resource://{name}/data")
def get_data(name: str) -> str:
return f"Data for {name}"
async with client_session(mcp._mcp_server) as client:
result = await client.read_resource(AnyUrl("resource://test/data"))
assert isinstance(result.contents[0], TextResourceContents)
assert result.contents[0].text == "Data for test"
async def test_resource_mismatched_params(self):
"""Test that mismatched parameters raise an error"""
mcp = FastMCP()
with pytest.raises(ValueError, match="Mismatch between URI parameters"):
@mcp.resource("resource://{name}/data")
def get_data(user: str) -> str:
return f"Data for {user}"
async def test_resource_multiple_params(self):
"""Test that multiple parameters work correctly"""
mcp = FastMCP()
@mcp.resource("resource://{org}/{repo}/data")
def get_data(org: str, repo: str) -> str:
return f"Data for {org}/{repo}"
async with client_session(mcp._mcp_server) as client:
result = await client.read_resource(
AnyUrl("resource://cursor/fastmcp/data")
)
assert isinstance(result.contents[0], TextResourceContents)
assert result.contents[0].text == "Data for cursor/fastmcp"
async def test_resource_multiple_mismatched_params(self):
"""Test that mismatched parameters raise an error"""
mcp = FastMCP()
with pytest.raises(ValueError, match="Mismatch between URI parameters"):
@mcp.resource("resource://{org}/{repo}/data")
def get_data_mismatched(org: str, repo_2: str) -> str:
return f"Data for {org}"
"""Test that a resource with no parameters works as a regular resource"""
mcp = FastMCP()
@mcp.resource("resource://static")
def get_static_data() -> str:
return "Static data"
async with client_session(mcp._mcp_server) as client:
result = await client.read_resource(AnyUrl("resource://static"))
assert isinstance(result.contents[0], TextResourceContents)
assert result.contents[0].text == "Static data"
async def test_template_to_resource_conversion(self):
"""Test that templates are properly converted to resources when accessed"""
mcp = FastMCP()
@mcp.resource("resource://{name}/data")
def get_data(name: str) -> str:
return f"Data for {name}"
# Should be registered as a template
assert len(mcp._resource_manager._templates) == 1
assert len(await mcp.list_resources()) == 0
# When accessed, should create a concrete resource
resource = await mcp._resource_manager.get_resource("resource://test/data")
assert isinstance(resource, FunctionResource)
result = await resource.read()
assert result == "Data for test"
class TestContextInjection:
"""Test context injection in tools."""
async def test_context_detection(self):
"""Test that context parameters are properly detected."""
mcp = FastMCP()
def tool_with_context(x: int, ctx: Context) -> str:
return f"Request {ctx.request_id}: {x}"
tool = mcp._tool_manager.add_tool(tool_with_context)
assert tool.context_kwarg == "ctx"
async def test_context_injection(self):
"""Test that context is properly injected into tool calls."""
mcp = FastMCP()
def tool_with_context(x: int, ctx: Context) -> str:
assert ctx.request_id is not None
return f"Request {ctx.request_id}: {x}"
mcp.add_tool(tool_with_context)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("tool_with_context", {"x": 42})
assert len(result.content) == 1
content = result.content[0]
assert isinstance(content, TextContent)
assert "Request" in content.text
assert "42" in content.text
async def test_async_context(self):
"""Test that context works in async functions."""
mcp = FastMCP()
async def async_tool(x: int, ctx: Context) -> str:
assert ctx.request_id is not None
return f"Async request {ctx.request_id}: {x}"
mcp.add_tool(async_tool)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("async_tool", {"x": 42})
assert len(result.content) == 1
content = result.content[0]
assert isinstance(content, TextContent)
assert "Async request" in content.text
assert "42" in content.text
async def test_context_logging(self):
"""Test that context logging methods work."""
mcp = FastMCP()
def logging_tool(msg: str, ctx: Context) -> str:
ctx.debug("Debug message")
ctx.info("Info message")
ctx.warning("Warning message")
ctx.error("Error message")
return f"Logged messages for {msg}"
mcp.add_tool(logging_tool)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("logging_tool", {"msg": "test"})
assert len(result.content) == 1
content = result.content[0]
assert isinstance(content, TextContent)
assert "Logged messages for test" in content.text
async def test_optional_context(self):
"""Test that context is optional."""
mcp = FastMCP()
def no_context(x: int) -> int:
return x * 2
mcp.add_tool(no_context)
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("no_context", {"x": 21})
assert len(result.content) == 1
content = result.content[0]
assert isinstance(content, TextContent)
assert content.text == "42"
async def test_context_resource_access(self):
"""Test that context can access resources."""
mcp = FastMCP()
@mcp.resource("test://data")
def test_resource() -> str:
return "resource data"
@mcp.tool()
async def tool_with_resource(ctx: Context) -> str:
data = await ctx.read_resource("test://data")
return f"Read resource: {data}"
async with client_session(mcp._mcp_server) as client:
result = await client.call_tool("tool_with_resource", {})
assert len(result.content) == 1
content = result.content[0]
assert isinstance(content, TextContent)
assert "Read resource: resource data" in content.text
class TestServerPrompts:
"""Test prompt functionality in FastMCP server."""
async def test_prompt_decorator(self):
"""Test that the prompt decorator registers prompts correctly."""
mcp = FastMCP()
@mcp.prompt()
def fn() -> str:
return "Hello, world!"
prompts = mcp._prompt_manager.list_prompts()
assert len(prompts) == 1
assert prompts[0].name == "fn"
# Don't compare functions directly since validate_call wraps them
content = await prompts[0].render()
assert isinstance(content[0].content, TextContent)
assert content[0].content.text == "Hello, world!"
async def test_prompt_decorator_with_name(self):
"""Test prompt decorator with custom name."""
mcp = FastMCP()
@mcp.prompt(name="custom_name")
def fn() -> str:
return "Hello, world!"
prompts = mcp._prompt_manager.list_prompts()
assert len(prompts) == 1
assert prompts[0].name == "custom_name"
content = await prompts[0].render()
assert isinstance(content[0].content, TextContent)
assert content[0].content.text == "Hello, world!"
async def test_prompt_decorator_with_description(self):
"""Test prompt decorator with custom description."""
mcp = FastMCP()
@mcp.prompt(description="A custom description")
def fn() -> str:
return "Hello, world!"
prompts = mcp._prompt_manager.list_prompts()
assert len(prompts) == 1
assert prompts[0].description == "A custom description"
content = await prompts[0].render()
assert isinstance(content[0].content, TextContent)
assert content[0].content.text == "Hello, world!"
def test_prompt_decorator_error(self):
"""Test error when decorator is used incorrectly."""
mcp = FastMCP()
with pytest.raises(TypeError, match="decorator was used incorrectly"):
@mcp.prompt # type: ignore
def fn() -> str:
return "Hello, world!"
async def test_list_prompts(self):
"""Test listing prompts through MCP protocol."""
mcp = FastMCP()
@mcp.prompt()
def fn(name: str, optional: str = "default") -> str:
return f"Hello, {name}!"
async with client_session(mcp._mcp_server) as client:
result = await client.list_prompts()
assert result.prompts is not None
assert len(result.prompts) == 1
prompt = result.prompts[0]
assert prompt.name == "fn"
assert prompt.arguments is not None
assert len(prompt.arguments) == 2
assert prompt.arguments[0].name == "name"
assert prompt.arguments[0].required is True
assert prompt.arguments[1].name == "optional"
assert prompt.arguments[1].required is False
async def test_get_prompt(self):
"""Test getting a prompt through MCP protocol."""
mcp = FastMCP()
@mcp.prompt()
def fn(name: str) -> str:
return f"Hello, {name}!"
async with client_session(mcp._mcp_server) as client:
result = await client.get_prompt("fn", {"name": "World"})
assert len(result.messages) == 1
message = result.messages[0]
assert message.role == "user"
content = message.content
assert isinstance(content, TextContent)
assert content.text == "Hello, World!"
async def test_get_prompt_with_resource(self):
"""Test getting a prompt that returns resource content."""
mcp = FastMCP()
@mcp.prompt()
def fn() -> Message:
return UserMessage(
content=EmbeddedResource(
type="resource",
resource=TextResourceContents(
uri=AnyUrl("file://file.txt"),
text="File contents",
mimeType="text/plain",
),
)
)
async with client_session(mcp._mcp_server) as client:
result = await client.get_prompt("fn")
assert len(result.messages) == 1
message = result.messages[0]
assert message.role == "user"
content = message.content
assert isinstance(content, EmbeddedResource)
resource = content.resource
assert isinstance(resource, TextResourceContents)
assert resource.text == "File contents"
assert resource.mimeType == "text/plain"
async def test_get_unknown_prompt(self):
"""Test error when getting unknown prompt."""
mcp = FastMCP()
async with client_session(mcp._mcp_server) as client:
with pytest.raises(McpError, match="Unknown prompt"):
await client.get_prompt("unknown")
async def test_get_prompt_missing_args(self):
"""Test error when required arguments are missing."""
mcp = FastMCP()
@mcp.prompt()
def prompt_fn(name: str) -> str:
return f"Hello, {name}!"
async with client_session(mcp._mcp_server) as client:
with pytest.raises(McpError, match="Missing required arguments"):
await client.get_prompt("prompt_fn")

View File

@@ -0,0 +1,306 @@
import logging
from typing import Optional
import pytest
from pydantic import BaseModel
import json
from mcp.server.fastmcp.exceptions import ToolError
from mcp.server.fastmcp.tools import ToolManager
class TestAddTools:
def test_basic_function(self):
"""Test registering and running a basic function."""
def add(a: int, b: int) -> int:
"""Add two numbers."""
return a + b
manager = ToolManager()
manager.add_tool(add)
tool = manager.get_tool("add")
assert tool is not None
assert tool.name == "add"
assert tool.description == "Add two numbers."
assert tool.is_async is False
assert tool.parameters["properties"]["a"]["type"] == "integer"
assert tool.parameters["properties"]["b"]["type"] == "integer"
async def test_async_function(self):
"""Test registering and running an async function."""
async def fetch_data(url: str) -> str:
"""Fetch data from URL."""
return f"Data from {url}"
manager = ToolManager()
manager.add_tool(fetch_data)
tool = manager.get_tool("fetch_data")
assert tool is not None
assert tool.name == "fetch_data"
assert tool.description == "Fetch data from URL."
assert tool.is_async is True
assert tool.parameters["properties"]["url"]["type"] == "string"
def test_pydantic_model_function(self):
"""Test registering a function that takes a Pydantic model."""
class UserInput(BaseModel):
name: str
age: int
def create_user(user: UserInput, flag: bool) -> dict:
"""Create a new user."""
return {"id": 1, **user.model_dump()}
manager = ToolManager()
manager.add_tool(create_user)
tool = manager.get_tool("create_user")
assert tool is not None
assert tool.name == "create_user"
assert tool.description == "Create a new user."
assert tool.is_async is False
assert "name" in tool.parameters["$defs"]["UserInput"]["properties"]
assert "age" in tool.parameters["$defs"]["UserInput"]["properties"]
assert "flag" in tool.parameters["properties"]
def test_add_invalid_tool(self):
manager = ToolManager()
with pytest.raises(AttributeError):
manager.add_tool(1) # type: ignore
def test_add_lambda(self):
manager = ToolManager()
tool = manager.add_tool(lambda x: x, name="my_tool")
assert tool.name == "my_tool"
def test_add_lambda_with_no_name(self):
manager = ToolManager()
with pytest.raises(
ValueError, match="You must provide a name for lambda functions"
):
manager.add_tool(lambda x: x)
def test_warn_on_duplicate_tools(self, caplog):
"""Test warning on duplicate tools."""
def f(x: int) -> int:
return x
manager = ToolManager()
manager.add_tool(f)
with caplog.at_level(logging.WARNING):
manager.add_tool(f)
assert "Tool already exists: f" in caplog.text
def test_disable_warn_on_duplicate_tools(self, caplog):
"""Test disabling warning on duplicate tools."""
def f(x: int) -> int:
return x
manager = ToolManager()
manager.add_tool(f)
manager.warn_on_duplicate_tools = False
with caplog.at_level(logging.WARNING):
manager.add_tool(f)
assert "Tool already exists: f" not in caplog.text
class TestCallTools:
async def test_call_tool(self):
def add(a: int, b: int) -> int:
"""Add two numbers."""
return a + b
manager = ToolManager()
manager.add_tool(add)
result = await manager.call_tool("add", {"a": 1, "b": 2})
assert result == 3
async def test_call_async_tool(self):
async def double(n: int) -> int:
"""Double a number."""
return n * 2
manager = ToolManager()
manager.add_tool(double)
result = await manager.call_tool("double", {"n": 5})
assert result == 10
async def test_call_tool_with_default_args(self):
def add(a: int, b: int = 1) -> int:
"""Add two numbers."""
return a + b
manager = ToolManager()
manager.add_tool(add)
result = await manager.call_tool("add", {"a": 1})
assert result == 2
async def test_call_tool_with_missing_args(self):
def add(a: int, b: int) -> int:
"""Add two numbers."""
return a + b
manager = ToolManager()
manager.add_tool(add)
with pytest.raises(ToolError):
await manager.call_tool("add", {"a": 1})
async def test_call_unknown_tool(self):
manager = ToolManager()
with pytest.raises(ToolError):
await manager.call_tool("unknown", {"a": 1})
async def test_call_tool_with_list_int_input(self):
def sum_vals(vals: list[int]) -> int:
return sum(vals)
manager = ToolManager()
manager.add_tool(sum_vals)
# Try both with plain list and with JSON list
result = await manager.call_tool("sum_vals", {"vals": "[1, 2, 3]"})
assert result == 6
result = await manager.call_tool("sum_vals", {"vals": [1, 2, 3]})
assert result == 6
async def test_call_tool_with_list_str_or_str_input(self):
def concat_strs(vals: list[str] | str) -> str:
return vals if isinstance(vals, str) else "".join(vals)
manager = ToolManager()
manager.add_tool(concat_strs)
# Try both with plain python object and with JSON list
result = await manager.call_tool("concat_strs", {"vals": ["a", "b", "c"]})
assert result == "abc"
result = await manager.call_tool("concat_strs", {"vals": '["a", "b", "c"]'})
assert result == "abc"
result = await manager.call_tool("concat_strs", {"vals": "a"})
assert result == "a"
result = await manager.call_tool("concat_strs", {"vals": '"a"'})
assert result == '"a"'
async def test_call_tool_with_complex_model(self):
from mcp.server.fastmcp import Context
class MyShrimpTank(BaseModel):
class Shrimp(BaseModel):
name: str
shrimp: list[Shrimp]
x: None
def name_shrimp(tank: MyShrimpTank, ctx: Context) -> list[str]:
return [x.name for x in tank.shrimp]
manager = ToolManager()
manager.add_tool(name_shrimp)
result = await manager.call_tool(
"name_shrimp",
{"tank": {"x": None, "shrimp": [{"name": "rex"}, {"name": "gertrude"}]}},
)
assert result == ["rex", "gertrude"]
result = await manager.call_tool(
"name_shrimp",
{"tank": '{"x": null, "shrimp": [{"name": "rex"}, {"name": "gertrude"}]}'},
)
assert result == ["rex", "gertrude"]
class TestToolSchema:
async def test_context_arg_excluded_from_schema(self):
from mcp.server.fastmcp import Context
def something(a: int, ctx: Context) -> int:
return a
manager = ToolManager()
tool = manager.add_tool(something)
assert "ctx" not in json.dumps(tool.parameters)
assert "Context" not in json.dumps(tool.parameters)
assert "ctx" not in tool.fn_metadata.arg_model.model_fields
class TestContextHandling:
"""Test context handling in the tool manager."""
def test_context_parameter_detection(self):
"""Test that context parameters are properly detected in Tool.from_function()."""
from mcp.server.fastmcp import Context
def tool_with_context(x: int, ctx: Context) -> str:
return str(x)
manager = ToolManager()
tool = manager.add_tool(tool_with_context)
assert tool.context_kwarg == "ctx"
def tool_without_context(x: int) -> str:
return str(x)
tool = manager.add_tool(tool_without_context)
assert tool.context_kwarg is None
async def test_context_injection(self):
"""Test that context is properly injected during tool execution."""
from mcp.server.fastmcp import Context, FastMCP
def tool_with_context(x: int, ctx: Context) -> str:
assert isinstance(ctx, Context)
return str(x)
manager = ToolManager()
manager.add_tool(tool_with_context)
mcp = FastMCP()
ctx = mcp.get_context()
result = await manager.call_tool("tool_with_context", {"x": 42}, context=ctx)
assert result == "42"
async def test_context_injection_async(self):
"""Test that context is properly injected in async tools."""
from mcp.server.fastmcp import Context, FastMCP
async def async_tool(x: int, ctx: Context) -> str:
assert isinstance(ctx, Context)
return str(x)
manager = ToolManager()
manager.add_tool(async_tool)
mcp = FastMCP()
ctx = mcp.get_context()
result = await manager.call_tool("async_tool", {"x": 42}, context=ctx)
assert result == "42"
async def test_context_optional(self):
"""Test that context is optional when calling tools."""
from mcp.server.fastmcp import Context
def tool_with_context(x: int, ctx: Optional[Context] = None) -> str:
return str(x)
manager = ToolManager()
manager.add_tool(tool_with_context)
# Should not raise an error when context is not provided
result = await manager.call_tool("tool_with_context", {"x": 42})
assert result == "42"
async def test_context_error_handling(self):
"""Test error handling when context injection fails."""
from mcp.server.fastmcp import Context, FastMCP
def tool_with_context(x: int, ctx: Context) -> str:
raise ValueError("Test error")
manager = ToolManager()
manager.add_tool(tool_with_context)
mcp = FastMCP()
ctx = mcp.get_context()
with pytest.raises(ToolError, match="Error executing tool tool_with_context"):
await manager.call_tool("tool_with_context", {"x": 42}, context=ctx)