Files
mcp-python-sdk/tests/server/test_lifespan.py
David Soria Parra 9d0f2daddb refactor: reorganize message handling for better type safety and clarity (#239)
* refactor: improve typing with memory stream type aliases

Move memory stream type definitions to models.py and use them throughout
the codebase for better type safety and maintainability.

GitHub-Issue:#201

* refactor: move streams to ParsedMessage

* refactor: update test files to use ParsedMessage

Updates test files to work with the ParsedMessage stream type aliases
and fixes a line length issue in test_201_client_hangs_on_logging.py.

Github-Issue:#201

* refactor: rename ParsedMessage to MessageFrame for clarity

🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>

* refactor: move MessageFrame class to types.py for better code organization

🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>

* fix pyright

* refactor: update websocket client to use MessageFrame

Modified the websocket client to work with the new MessageFrame type,
preserving raw message text and properly extracting the root JSON-RPC
message when sending.

Github-Issue:#204

* fix: use NoneType instead of None for type parameters in MessageFrame

🤖 Generated with [Claude Code](https://claude.ai/code)

* refactor: rename root to message
2025-03-13 13:44:55 +00:00

239 lines
7.5 KiB
Python

"""Tests for lifespan functionality in both low-level and FastMCP servers."""
from contextlib import asynccontextmanager
from typing import AsyncIterator
import anyio
import pytest
from pydantic import TypeAdapter
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.lowlevel.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
from mcp.types import (
ClientCapabilities,
Implementation,
InitializeRequestParams,
JSONRPCMessage,
JSONRPCNotification,
JSONRPCRequest,
MessageFrame,
)
@pytest.mark.anyio
async def test_lowlevel_server_lifespan():
"""Test that lifespan works in low-level server."""
@asynccontextmanager
async def test_lifespan(server: Server) -> AsyncIterator[dict[str, bool]]:
"""Test lifespan context that tracks startup/shutdown."""
context = {"started": False, "shutdown": False}
try:
context["started"] = True
yield context
finally:
context["shutdown"] = True
server = Server("test", lifespan=test_lifespan)
# Create memory streams for testing
send_stream1, receive_stream1 = anyio.create_memory_object_stream(100)
send_stream2, receive_stream2 = anyio.create_memory_object_stream(100)
# Create a tool that accesses lifespan context
@server.call_tool()
async def check_lifespan(name: str, arguments: dict) -> list:
ctx = server.request_context
assert isinstance(ctx.lifespan_context, dict)
assert ctx.lifespan_context["started"]
assert not ctx.lifespan_context["shutdown"]
return [{"type": "text", "text": "true"}]
# Run server in background task
async with (
anyio.create_task_group() as tg,
send_stream1,
receive_stream1,
send_stream2,
receive_stream2,
):
async def run_server():
await server.run(
receive_stream1,
send_stream2,
InitializationOptions(
server_name="test",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
raise_exceptions=True,
)
tg.start_soon(run_server)
# Initialize the server
params = InitializeRequestParams(
protocolVersion="2024-11-05",
capabilities=ClientCapabilities(),
clientInfo=Implementation(name="test-client", version="0.1.0"),
)
await send_stream1.send(
MessageFrame(
message=JSONRPCMessage(
root=JSONRPCRequest(
jsonrpc="2.0",
id=1,
method="initialize",
params=TypeAdapter(InitializeRequestParams).dump_python(params),
)
),
raw=None,
)
)
response = await receive_stream2.receive()
# Send initialized notification
await send_stream1.send(
MessageFrame(
message=JSONRPCMessage(
root=JSONRPCNotification(
jsonrpc="2.0",
method="notifications/initialized",
)
),
raw=None,
)
)
# Call the tool to verify lifespan context
await send_stream1.send(
MessageFrame(
message=JSONRPCMessage(
root=JSONRPCRequest(
jsonrpc="2.0",
id=2,
method="tools/call",
params={"name": "check_lifespan", "arguments": {}},
)
),
raw=None,
)
)
# Get response and verify
response = await receive_stream2.receive()
assert response.message.root.result["content"][0]["text"] == "true"
# Cancel server task
tg.cancel_scope.cancel()
@pytest.mark.anyio
async def test_fastmcp_server_lifespan():
"""Test that lifespan works in FastMCP server."""
@asynccontextmanager
async def test_lifespan(server: FastMCP) -> AsyncIterator[dict]:
"""Test lifespan context that tracks startup/shutdown."""
context = {"started": False, "shutdown": False}
try:
context["started"] = True
yield context
finally:
context["shutdown"] = True
server = FastMCP("test", lifespan=test_lifespan)
# Create memory streams for testing
send_stream1, receive_stream1 = anyio.create_memory_object_stream(100)
send_stream2, receive_stream2 = anyio.create_memory_object_stream(100)
# Add a tool that checks lifespan context
@server.tool()
def check_lifespan(ctx: Context) -> bool:
"""Tool that checks lifespan context."""
assert isinstance(ctx.request_context.lifespan_context, dict)
assert ctx.request_context.lifespan_context["started"]
assert not ctx.request_context.lifespan_context["shutdown"]
return True
# Run server in background task
async with (
anyio.create_task_group() as tg,
send_stream1,
receive_stream1,
send_stream2,
receive_stream2,
):
async def run_server():
await server._mcp_server.run(
receive_stream1,
send_stream2,
server._mcp_server.create_initialization_options(),
raise_exceptions=True,
)
tg.start_soon(run_server)
# Initialize the server
params = InitializeRequestParams(
protocolVersion="2024-11-05",
capabilities=ClientCapabilities(),
clientInfo=Implementation(name="test-client", version="0.1.0"),
)
await send_stream1.send(
MessageFrame(
message=JSONRPCMessage(
root=JSONRPCRequest(
jsonrpc="2.0",
id=1,
method="initialize",
params=TypeAdapter(InitializeRequestParams).dump_python(params),
)
),
raw=None,
)
)
response = await receive_stream2.receive()
# Send initialized notification
await send_stream1.send(
MessageFrame(
message=JSONRPCMessage(
root=JSONRPCNotification(
jsonrpc="2.0",
method="notifications/initialized",
)
),
raw=None,
)
)
# Call the tool to verify lifespan context
await send_stream1.send(
MessageFrame(
message=JSONRPCMessage(
root=JSONRPCRequest(
jsonrpc="2.0",
id=2,
method="tools/call",
params={"name": "check_lifespan", "arguments": {}},
)
),
raw=None,
)
)
# Get response and verify
response = await receive_stream2.receive()
assert response.message.root.result["content"][0]["text"] == "true"
# Cancel server task
tg.cancel_scope.cancel()