import contextlib import logging from collections.abc import AsyncIterator import anyio import click import mcp.types as types from mcp.server.lowlevel import Server from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from starlette.applications import Starlette from starlette.routing import Mount from starlette.types import Receive, Scope, Send logger = logging.getLogger(__name__) @click.command() @click.option("--port", default=3000, help="Port to listen on for HTTP") @click.option( "--log-level", default="INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", ) @click.option( "--json-response", is_flag=True, default=False, help="Enable JSON responses instead of SSE streams", ) def main( port: int, log_level: str, json_response: bool, ) -> int: # Configure logging logging.basicConfig( level=getattr(logging, log_level.upper()), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) app = Server("mcp-streamable-http-stateless-demo") @app.call_tool() async def call_tool( name: str, arguments: dict ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: ctx = app.request_context interval = arguments.get("interval", 1.0) count = arguments.get("count", 5) caller = arguments.get("caller", "unknown") # Send the specified number of notifications with the given interval for i in range(count): await ctx.session.send_log_message( level="info", data=f"Notification {i+1}/{count} from caller: {caller}", logger="notification_stream", related_request_id=ctx.request_id, ) if i < count - 1: # Don't wait after the last notification await anyio.sleep(interval) return [ types.TextContent( type="text", text=( f"Sent {count} notifications with {interval}s interval" f" for caller: {caller}" ), ) ] @app.list_tools() async def list_tools() -> list[types.Tool]: return [ types.Tool( name="start-notification-stream", description=( "Sends a stream of notifications with configurable count" " and interval" ), inputSchema={ "type": "object", "required": ["interval", "count", "caller"], "properties": { "interval": { "type": "number", "description": "Interval between notifications in seconds", }, "count": { "type": "number", "description": "Number of notifications to send", }, "caller": { "type": "string", "description": ( "Identifier of the caller to include in notifications" ), }, }, }, ) ] # Create the session manager with true stateless mode session_manager = StreamableHTTPSessionManager( app=app, event_store=None, json_response=json_response, stateless=True, ) async def handle_streamable_http( scope: Scope, receive: Receive, send: Send ) -> None: await session_manager.handle_request(scope, receive, send) @contextlib.asynccontextmanager async def lifespan(app: Starlette) -> AsyncIterator[None]: """Context manager for session manager.""" async with session_manager.run(): logger.info("Application started with StreamableHTTP session manager!") try: yield finally: logger.info("Application shutting down...") # Create an ASGI application using the transport starlette_app = Starlette( debug=True, routes=[ Mount("/mcp", app=handle_streamable_http), ], lifespan=lifespan, ) import uvicorn uvicorn.run(starlette_app, host="127.0.0.1", port=port) return 0