import contextlib import logging from collections.abc import AsyncIterator import anyio import click import mcp.types as types from mcp.server.lowlevel import Server from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from pydantic import AnyUrl from starlette.applications import Starlette from starlette.routing import Mount from starlette.types import Receive, Scope, Send from .event_store import InMemoryEventStore # Configure logging logger = logging.getLogger(__name__) @click.command() @click.option("--port", default=3000, help="Port to listen on for HTTP") @click.option( "--log-level", default="INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", ) @click.option( "--json-response", is_flag=True, default=False, help="Enable JSON responses instead of SSE streams", ) def main( port: int, log_level: str, json_response: bool, ) -> int: # Configure logging logging.basicConfig( level=getattr(logging, log_level.upper()), format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) app = Server("mcp-streamable-http-demo") @app.call_tool() async def call_tool( name: str, arguments: dict ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: ctx = app.request_context interval = arguments.get("interval", 1.0) count = arguments.get("count", 5) caller = arguments.get("caller", "unknown") # Send the specified number of notifications with the given interval for i in range(count): # Include more detailed message for resumability demonstration notification_msg = ( f"[{i+1}/{count}] Event from '{caller}' - " f"Use Last-Event-ID to resume if disconnected" ) await ctx.session.send_log_message( level="info", data=notification_msg, logger="notification_stream", # Associates this notification with the original request # Ensures notifications are sent to the correct response stream # Without this, notifications will either go to: # - a standalone SSE stream (if GET request is supported) # - nowhere (if GET request isn't supported) related_request_id=ctx.request_id, ) logger.debug(f"Sent notification {i+1}/{count} for caller: {caller}") if i < count - 1: # Don't wait after the last notification await anyio.sleep(interval) # This will send a resource notificaiton though standalone SSE # established by GET request await ctx.session.send_resource_updated(uri=AnyUrl("http:///test_resource")) return [ types.TextContent( type="text", text=( f"Sent {count} notifications with {interval}s interval" f" for caller: {caller}" ), ) ] @app.list_tools() async def list_tools() -> list[types.Tool]: return [ types.Tool( name="start-notification-stream", description=( "Sends a stream of notifications with configurable count" " and interval" ), inputSchema={ "type": "object", "required": ["interval", "count", "caller"], "properties": { "interval": { "type": "number", "description": "Interval between notifications in seconds", }, "count": { "type": "number", "description": "Number of notifications to send", }, "caller": { "type": "string", "description": ( "Identifier of the caller to include in notifications" ), }, }, }, ) ] # Create event store for resumability # The InMemoryEventStore enables resumability support for StreamableHTTP transport. # It stores SSE events with unique IDs, allowing clients to: # 1. Receive event IDs for each SSE message # 2. Resume streams by sending Last-Event-ID in GET requests # 3. Replay missed events after reconnection # Note: This in-memory implementation is for demonstration ONLY. # For production, use a persistent storage solution. event_store = InMemoryEventStore() # Create the session manager with our app and event store session_manager = StreamableHTTPSessionManager( app=app, event_store=event_store, # Enable resumability json_response=json_response, ) # ASGI handler for streamable HTTP connections async def handle_streamable_http( scope: Scope, receive: Receive, send: Send ) -> None: await session_manager.handle_request(scope, receive, send) @contextlib.asynccontextmanager async def lifespan(app: Starlette) -> AsyncIterator[None]: """Context manager for managing session manager lifecycle.""" async with session_manager.run(): logger.info("Application started with StreamableHTTP session manager!") try: yield finally: logger.info("Application shutting down...") # Create an ASGI application using the transport starlette_app = Starlette( debug=True, routes=[ Mount("/mcp", app=handle_streamable_http), ], lifespan=lifespan, ) import uvicorn uvicorn.run(starlette_app, host="127.0.0.1", port=port) return 0