"""FastMCP - A more ergonomic interface for MCP servers.""" from __future__ import annotations as _annotations import inspect import re from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequence from contextlib import ( AbstractAsyncContextManager, asynccontextmanager, ) from itertools import chain from typing import Any, Generic, Literal import anyio import pydantic_core from pydantic import BaseModel, Field from pydantic.networks import AnyUrl from pydantic_settings import BaseSettings, SettingsConfigDict from starlette.applications import Starlette from starlette.middleware import Middleware from starlette.middleware.authentication import AuthenticationMiddleware from starlette.requests import Request from starlette.responses import Response from starlette.routing import Mount, Route from starlette.types import Receive, Scope, Send from mcp.server.auth.middleware.auth_context import AuthContextMiddleware from mcp.server.auth.middleware.bearer_auth import ( BearerAuthBackend, RequireAuthMiddleware, ) from mcp.server.auth.provider import OAuthAuthorizationServerProvider from mcp.server.auth.settings import ( AuthSettings, ) from mcp.server.fastmcp.exceptions import ResourceError from mcp.server.fastmcp.prompts import Prompt, PromptManager from mcp.server.fastmcp.resources import FunctionResource, Resource, ResourceManager from mcp.server.fastmcp.tools import ToolManager from mcp.server.fastmcp.utilities.logging import configure_logging, get_logger from mcp.server.fastmcp.utilities.types import Image from mcp.server.lowlevel.helper_types import ReadResourceContents from mcp.server.lowlevel.server import LifespanResultT from mcp.server.lowlevel.server import Server as MCPServer from mcp.server.lowlevel.server import lifespan as default_lifespan from mcp.server.session import ServerSession, ServerSessionT from mcp.server.sse import SseServerTransport from mcp.server.stdio import stdio_server from mcp.shared.context import LifespanContextT, RequestContext from mcp.types import ( AnyFunction, EmbeddedResource, GetPromptResult, ImageContent, TextContent, ToolAnnotations, ) from mcp.types import Prompt as MCPPrompt from mcp.types import PromptArgument as MCPPromptArgument from mcp.types import Resource as MCPResource from mcp.types import ResourceTemplate as MCPResourceTemplate from mcp.types import Tool as MCPTool logger = get_logger(__name__) class Settings(BaseSettings, Generic[LifespanResultT]): """FastMCP server settings. All settings can be configured via environment variables with the prefix FASTMCP_. For example, FASTMCP_DEBUG=true will set debug=True. """ model_config = SettingsConfigDict( env_prefix="FASTMCP_", env_file=".env", env_nested_delimiter="__", nested_model_default_partial_update=True, extra="ignore", ) # Server settings debug: bool = False log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO" # HTTP settings host: str = "0.0.0.0" port: int = 8000 sse_path: str = "/sse" message_path: str = "/messages/" # resource settings warn_on_duplicate_resources: bool = True # tool settings warn_on_duplicate_tools: bool = True # prompt settings warn_on_duplicate_prompts: bool = True dependencies: list[str] = Field( default_factory=list, description="List of dependencies to install in the server environment", ) lifespan: ( Callable[[FastMCP], AbstractAsyncContextManager[LifespanResultT]] | None ) = Field(None, description="Lifespan context manager") auth: AuthSettings | None = None def lifespan_wrapper( app: FastMCP, lifespan: Callable[[FastMCP], AbstractAsyncContextManager[LifespanResultT]], ) -> Callable[[MCPServer[LifespanResultT]], AbstractAsyncContextManager[object]]: @asynccontextmanager async def wrap(s: MCPServer[LifespanResultT]) -> AsyncIterator[object]: async with lifespan(app) as context: yield context return wrap class FastMCP: def __init__( self, name: str | None = None, instructions: str | None = None, auth_server_provider: OAuthAuthorizationServerProvider[Any, Any, Any] | None = None, **settings: Any, ): self.settings = Settings(**settings) self._mcp_server = MCPServer( name=name or "FastMCP", instructions=instructions, lifespan=lifespan_wrapper(self, self.settings.lifespan) if self.settings.lifespan else default_lifespan, ) self._tool_manager = ToolManager( warn_on_duplicate_tools=self.settings.warn_on_duplicate_tools ) self._resource_manager = ResourceManager( warn_on_duplicate_resources=self.settings.warn_on_duplicate_resources ) self._prompt_manager = PromptManager( warn_on_duplicate_prompts=self.settings.warn_on_duplicate_prompts ) if (self.settings.auth is not None) != (auth_server_provider is not None): # TODO: after we support separate authorization servers (see # https://github.com/modelcontextprotocol/modelcontextprotocol/pull/284) # we should validate that if auth is enabled, we have either an # auth_server_provider to host our own authorization server, # OR the URL of a 3rd party authorization server. raise ValueError( "settings.auth must be specified if and only if auth_server_provider " "is specified" ) self._auth_server_provider = auth_server_provider self._custom_starlette_routes: list[Route] = [] self.dependencies = self.settings.dependencies # Set up MCP protocol handlers self._setup_handlers() # Configure logging configure_logging(self.settings.log_level) @property def name(self) -> str: return self._mcp_server.name @property def instructions(self) -> str | None: return self._mcp_server.instructions def run(self, transport: Literal["stdio", "sse"] = "stdio") -> None: """Run the FastMCP server. Note this is a synchronous function. Args: transport: Transport protocol to use ("stdio" or "sse") """ TRANSPORTS = Literal["stdio", "sse"] if transport not in TRANSPORTS.__args__: # type: ignore raise ValueError(f"Unknown transport: {transport}") if transport == "stdio": anyio.run(self.run_stdio_async) else: # transport == "sse" anyio.run(self.run_sse_async) def _setup_handlers(self) -> None: """Set up core MCP protocol handlers.""" self._mcp_server.list_tools()(self.list_tools) self._mcp_server.call_tool()(self.call_tool) self._mcp_server.list_resources()(self.list_resources) self._mcp_server.read_resource()(self.read_resource) self._mcp_server.list_prompts()(self.list_prompts) self._mcp_server.get_prompt()(self.get_prompt) self._mcp_server.list_resource_templates()(self.list_resource_templates) async def list_tools(self) -> list[MCPTool]: """List all available tools.""" tools = self._tool_manager.list_tools() return [ MCPTool( name=info.name, description=info.description, inputSchema=info.parameters, annotations=info.annotations, ) for info in tools ] def get_context(self) -> Context[ServerSession, object]: """ Returns a Context object. Note that the context will only be valid during a request; outside a request, most methods will error. """ try: request_context = self._mcp_server.request_context except LookupError: request_context = None return Context(request_context=request_context, fastmcp=self) async def call_tool( self, name: str, arguments: dict[str, Any] ) -> Sequence[TextContent | ImageContent | EmbeddedResource]: """Call a tool by name with arguments.""" context = self.get_context() result = await self._tool_manager.call_tool(name, arguments, context=context) converted_result = _convert_to_content(result) return converted_result async def list_resources(self) -> list[MCPResource]: """List all available resources.""" resources = self._resource_manager.list_resources() return [ MCPResource( uri=resource.uri, name=resource.name or "", description=resource.description, mimeType=resource.mime_type, ) for resource in resources ] async def list_resource_templates(self) -> list[MCPResourceTemplate]: templates = self._resource_manager.list_templates() return [ MCPResourceTemplate( uriTemplate=template.uri_template, name=template.name, description=template.description, ) for template in templates ] async def read_resource(self, uri: AnyUrl | str) -> Iterable[ReadResourceContents]: """Read a resource by URI.""" resource = await self._resource_manager.get_resource(uri) if not resource: raise ResourceError(f"Unknown resource: {uri}") try: content = await resource.read() return [ReadResourceContents(content=content, mime_type=resource.mime_type)] except Exception as e: logger.error(f"Error reading resource {uri}: {e}") raise ResourceError(str(e)) def add_tool( self, fn: AnyFunction, name: str | None = None, description: str | None = None, annotations: ToolAnnotations | None = None, ) -> None: """Add a tool to the server. The tool function can optionally request a Context object by adding a parameter with the Context type annotation. See the @tool decorator for examples. Args: fn: The function to register as a tool name: Optional name for the tool (defaults to function name) description: Optional description of what the tool does annotations: Optional ToolAnnotations providing additional tool information """ self._tool_manager.add_tool( fn, name=name, description=description, annotations=annotations ) def tool( self, name: str | None = None, description: str | None = None, annotations: ToolAnnotations | None = None, ) -> Callable[[AnyFunction], AnyFunction]: """Decorator to register a tool. Tools can optionally request a Context object by adding a parameter with the Context type annotation. The context provides access to MCP capabilities like logging, progress reporting, and resource access. Args: name: Optional name for the tool (defaults to function name) description: Optional description of what the tool does annotations: Optional ToolAnnotations providing additional tool information Example: @server.tool() def my_tool(x: int) -> str: return str(x) @server.tool() def tool_with_context(x: int, ctx: Context) -> str: ctx.info(f"Processing {x}") return str(x) @server.tool() async def async_tool(x: int, context: Context) -> str: await context.report_progress(50, 100) return str(x) """ # Check if user passed function directly instead of calling decorator if callable(name): raise TypeError( "The @tool decorator was used incorrectly. " "Did you forget to call it? Use @tool() instead of @tool" ) def decorator(fn: AnyFunction) -> AnyFunction: self.add_tool( fn, name=name, description=description, annotations=annotations ) return fn return decorator def add_resource(self, resource: Resource) -> None: """Add a resource to the server. Args: resource: A Resource instance to add """ self._resource_manager.add_resource(resource) def resource( self, uri: str, *, name: str | None = None, description: str | None = None, mime_type: str | None = None, ) -> Callable[[AnyFunction], AnyFunction]: """Decorator to register a function as a resource. The function will be called when the resource is read to generate its content. The function can return: - str for text content - bytes for binary content - other types will be converted to JSON If the URI contains parameters (e.g. "resource://{param}") or the function has parameters, it will be registered as a template resource. Args: uri: URI for the resource (e.g. "resource://my-resource" or "resource://{param}") name: Optional name for the resource description: Optional description of the resource mime_type: Optional MIME type for the resource Example: @server.resource("resource://my-resource") def get_data() -> str: return "Hello, world!" @server.resource("resource://my-resource") async get_data() -> str: data = await fetch_data() return f"Hello, world! {data}" @server.resource("resource://{city}/weather") def get_weather(city: str) -> str: return f"Weather for {city}" @server.resource("resource://{city}/weather") async def get_weather(city: str) -> str: data = await fetch_weather(city) return f"Weather for {city}: {data}" """ # Check if user passed function directly instead of calling decorator if callable(uri): raise TypeError( "The @resource decorator was used incorrectly. " "Did you forget to call it? Use @resource('uri') instead of @resource" ) def decorator(fn: AnyFunction) -> AnyFunction: # Check if this should be a template has_uri_params = "{" in uri and "}" in uri has_func_params = bool(inspect.signature(fn).parameters) if has_uri_params or has_func_params: # Validate that URI params match function params uri_params = set(re.findall(r"{(\w+)}", uri)) func_params = set(inspect.signature(fn).parameters.keys()) if uri_params != func_params: raise ValueError( f"Mismatch between URI parameters {uri_params} " f"and function parameters {func_params}" ) # Register as template self._resource_manager.add_template( fn=fn, uri_template=uri, name=name, description=description, mime_type=mime_type or "text/plain", ) else: # Register as regular resource resource = FunctionResource( uri=AnyUrl(uri), name=name, description=description, mime_type=mime_type or "text/plain", fn=fn, ) self.add_resource(resource) return fn return decorator def add_prompt(self, prompt: Prompt) -> None: """Add a prompt to the server. Args: prompt: A Prompt instance to add """ self._prompt_manager.add_prompt(prompt) def prompt( self, name: str | None = None, description: str | None = None ) -> Callable[[AnyFunction], AnyFunction]: """Decorator to register a prompt. Args: name: Optional name for the prompt (defaults to function name) description: Optional description of what the prompt does Example: @server.prompt() def analyze_table(table_name: str) -> list[Message]: schema = read_table_schema(table_name) return [ { "role": "user", "content": f"Analyze this schema:\n{schema}" } ] @server.prompt() async def analyze_file(path: str) -> list[Message]: content = await read_file(path) return [ { "role": "user", "content": { "type": "resource", "resource": { "uri": f"file://{path}", "text": content } } } ] """ # Check if user passed function directly instead of calling decorator if callable(name): raise TypeError( "The @prompt decorator was used incorrectly. " "Did you forget to call it? Use @prompt() instead of @prompt" ) def decorator(func: AnyFunction) -> AnyFunction: prompt = Prompt.from_function(func, name=name, description=description) self.add_prompt(prompt) return func return decorator def custom_route( self, path: str, methods: list[str], name: str | None = None, include_in_schema: bool = True, ): """ Decorator to register a custom HTTP route on the FastMCP server. Allows adding arbitrary HTTP endpoints outside the standard MCP protocol, which can be useful for OAuth callbacks, health checks, or admin APIs. The handler function must be an async function that accepts a Starlette Request and returns a Response. Args: path: URL path for the route (e.g., "/oauth/callback") methods: List of HTTP methods to support (e.g., ["GET", "POST"]) name: Optional name for the route (to reference this route with Starlette's reverse URL lookup feature) include_in_schema: Whether to include in OpenAPI schema, defaults to True Example: @server.custom_route("/health", methods=["GET"]) async def health_check(request: Request) -> Response: return JSONResponse({"status": "ok"}) """ def decorator( func: Callable[[Request], Awaitable[Response]], ) -> Callable[[Request], Awaitable[Response]]: self._custom_starlette_routes.append( Route( path, endpoint=func, methods=methods, name=name, include_in_schema=include_in_schema, ) ) return func return decorator async def run_stdio_async(self) -> None: """Run the server using stdio transport.""" async with stdio_server() as (read_stream, write_stream): await self._mcp_server.run( read_stream, write_stream, self._mcp_server.create_initialization_options(), ) async def run_sse_async(self) -> None: """Run the server using SSE transport.""" import uvicorn starlette_app = self.sse_app() config = uvicorn.Config( starlette_app, host=self.settings.host, port=self.settings.port, log_level=self.settings.log_level.lower(), ) server = uvicorn.Server(config) await server.serve() def sse_app(self) -> Starlette: """Return an instance of the SSE server app.""" from starlette.middleware import Middleware from starlette.routing import Mount, Route # Set up auth context and dependencies sse = SseServerTransport(self.settings.message_path) async def handle_sse(scope: Scope, receive: Receive, send: Send): # Add client ID from auth context into request context if available async with sse.connect_sse( scope, receive, send, ) as streams: await self._mcp_server.run( streams[0], streams[1], self._mcp_server.create_initialization_options(), ) return Response() # Create routes routes: list[Route | Mount] = [] middleware: list[Middleware] = [] required_scopes = [] # Add auth endpoints if auth provider is configured if self._auth_server_provider: assert self.settings.auth from mcp.server.auth.routes import create_auth_routes required_scopes = self.settings.auth.required_scopes or [] middleware = [ # extract auth info from request (but do not require it) Middleware( AuthenticationMiddleware, backend=BearerAuthBackend( provider=self._auth_server_provider, ), ), # Add the auth context middleware to store # authenticated user in a contextvar Middleware(AuthContextMiddleware), ] routes.extend( create_auth_routes( provider=self._auth_server_provider, issuer_url=self.settings.auth.issuer_url, service_documentation_url=self.settings.auth.service_documentation_url, client_registration_options=self.settings.auth.client_registration_options, revocation_options=self.settings.auth.revocation_options, ) ) routes.append( Route( self.settings.sse_path, endpoint=RequireAuthMiddleware(handle_sse, required_scopes), methods=["GET"], ) ) routes.append( Mount( self.settings.message_path, app=RequireAuthMiddleware(sse.handle_post_message, required_scopes), ) ) # mount these routes last, so they have the lowest route matching precedence routes.extend(self._custom_starlette_routes) # Create Starlette app with routes and middleware return Starlette( debug=self.settings.debug, routes=routes, middleware=middleware ) async def list_prompts(self) -> list[MCPPrompt]: """List all available prompts.""" prompts = self._prompt_manager.list_prompts() return [ MCPPrompt( name=prompt.name, description=prompt.description, arguments=[ MCPPromptArgument( name=arg.name, description=arg.description, required=arg.required, ) for arg in (prompt.arguments or []) ], ) for prompt in prompts ] async def get_prompt( self, name: str, arguments: dict[str, Any] | None = None ) -> GetPromptResult: """Get a prompt by name with arguments.""" try: messages = await self._prompt_manager.render_prompt(name, arguments) return GetPromptResult(messages=pydantic_core.to_jsonable_python(messages)) except Exception as e: logger.error(f"Error getting prompt {name}: {e}") raise ValueError(str(e)) def _convert_to_content( result: Any, ) -> Sequence[TextContent | ImageContent | EmbeddedResource]: """Convert a result to a sequence of content objects.""" if result is None: return [] if isinstance(result, TextContent | ImageContent | EmbeddedResource): return [result] if isinstance(result, Image): return [result.to_image_content()] if isinstance(result, list | tuple): return list(chain.from_iterable(_convert_to_content(item) for item in result)) # type: ignore[reportUnknownVariableType] if not isinstance(result, str): result = pydantic_core.to_json(result, fallback=str, indent=2).decode() return [TextContent(type="text", text=result)] class Context(BaseModel, Generic[ServerSessionT, LifespanContextT]): """Context object providing access to MCP capabilities. This provides a cleaner interface to MCP's RequestContext functionality. It gets injected into tool and resource functions that request it via type hints. To use context in a tool function, add a parameter with the Context type annotation: ```python @server.tool() def my_tool(x: int, ctx: Context) -> str: # Log messages to the client ctx.info(f"Processing {x}") ctx.debug("Debug info") ctx.warning("Warning message") ctx.error("Error message") # Report progress ctx.report_progress(50, 100) # Access resources data = ctx.read_resource("resource://data") # Get request info request_id = ctx.request_id client_id = ctx.client_id return str(x) ``` The context parameter name can be anything as long as it's annotated with Context. The context is optional - tools that don't need it can omit the parameter. """ _request_context: RequestContext[ServerSessionT, LifespanContextT] | None _fastmcp: FastMCP | None def __init__( self, *, request_context: RequestContext[ServerSessionT, LifespanContextT] | None = None, fastmcp: FastMCP | None = None, **kwargs: Any, ): super().__init__(**kwargs) self._request_context = request_context self._fastmcp = fastmcp @property def fastmcp(self) -> FastMCP: """Access to the FastMCP server.""" if self._fastmcp is None: raise ValueError("Context is not available outside of a request") return self._fastmcp @property def request_context(self) -> RequestContext[ServerSessionT, LifespanContextT]: """Access to the underlying request context.""" if self._request_context is None: raise ValueError("Context is not available outside of a request") return self._request_context async def report_progress( self, progress: float, total: float | None = None ) -> None: """Report progress for the current operation. Args: progress: Current progress value e.g. 24 total: Optional total value e.g. 100 """ progress_token = ( self.request_context.meta.progressToken if self.request_context.meta else None ) if progress_token is None: return await self.request_context.session.send_progress_notification( progress_token=progress_token, progress=progress, total=total ) async def read_resource(self, uri: str | AnyUrl) -> Iterable[ReadResourceContents]: """Read a resource by URI. Args: uri: Resource URI to read Returns: The resource content as either text or bytes """ assert ( self._fastmcp is not None ), "Context is not available outside of a request" return await self._fastmcp.read_resource(uri) async def log( self, level: Literal["debug", "info", "warning", "error"], message: str, *, logger_name: str | None = None, ) -> None: """Send a log message to the client. Args: level: Log level (debug, info, warning, error) message: Log message logger_name: Optional logger name **extra: Additional structured data to include """ await self.request_context.session.send_log_message( level=level, data=message, logger=logger_name, related_request_id=self.request_id, ) @property def client_id(self) -> str | None: """Get the client ID if available.""" return ( getattr(self.request_context.meta, "client_id", None) if self.request_context.meta else None ) @property def request_id(self) -> str: """Get the unique ID for this request.""" return str(self.request_context.request_id) @property def session(self): """Access to the underlying session for advanced usage.""" return self.request_context.session # Convenience methods for common log levels async def debug(self, message: str, **extra: Any) -> None: """Send a debug log message.""" await self.log("debug", message, **extra) async def info(self, message: str, **extra: Any) -> None: """Send an info log message.""" await self.log("info", message, **extra) async def warning(self, message: str, **extra: Any) -> None: """Send a warning log message.""" await self.log("warning", message, **extra) async def error(self, message: str, **extra: Any) -> None: """Send an error log message.""" await self.log("error", message, **extra)