mirror of
https://github.com/aljazceru/mcp-python-sdk.git
synced 2025-12-19 06:54:18 +01:00
106 lines
3.4 KiB
Python
106 lines
3.4 KiB
Python
"""
|
|
In-memory event store for demonstrating resumability functionality.
|
|
|
|
This is a simple implementation intended for examples and testing,
|
|
not for production use where a persistent storage solution would be more appropriate.
|
|
"""
|
|
|
|
import logging
|
|
from collections import deque
|
|
from dataclasses import dataclass
|
|
from uuid import uuid4
|
|
|
|
from mcp.server.streamable_http import (
|
|
EventCallback,
|
|
EventId,
|
|
EventMessage,
|
|
EventStore,
|
|
StreamId,
|
|
)
|
|
from mcp.types import JSONRPCMessage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class EventEntry:
|
|
"""
|
|
Represents an event entry in the event store.
|
|
"""
|
|
|
|
event_id: EventId
|
|
stream_id: StreamId
|
|
message: JSONRPCMessage
|
|
|
|
|
|
class InMemoryEventStore(EventStore):
|
|
"""
|
|
Simple in-memory implementation of the EventStore interface for resumability.
|
|
This is primarily intended for examples and testing, not for production use
|
|
where a persistent storage solution would be more appropriate.
|
|
|
|
This implementation keeps only the last N events per stream for memory efficiency.
|
|
"""
|
|
|
|
def __init__(self, max_events_per_stream: int = 100):
|
|
"""Initialize the event store.
|
|
|
|
Args:
|
|
max_events_per_stream: Maximum number of events to keep per stream
|
|
"""
|
|
self.max_events_per_stream = max_events_per_stream
|
|
# for maintaining last N events per stream
|
|
self.streams: dict[StreamId, deque[EventEntry]] = {}
|
|
# event_id -> EventEntry for quick lookup
|
|
self.event_index: dict[EventId, EventEntry] = {}
|
|
|
|
async def store_event(
|
|
self, stream_id: StreamId, message: JSONRPCMessage
|
|
) -> EventId:
|
|
"""Stores an event with a generated event ID."""
|
|
event_id = str(uuid4())
|
|
event_entry = EventEntry(
|
|
event_id=event_id, stream_id=stream_id, message=message
|
|
)
|
|
|
|
# Get or create deque for this stream
|
|
if stream_id not in self.streams:
|
|
self.streams[stream_id] = deque(maxlen=self.max_events_per_stream)
|
|
|
|
# If deque is full, the oldest event will be automatically removed
|
|
# We need to remove it from the event_index as well
|
|
if len(self.streams[stream_id]) == self.max_events_per_stream:
|
|
oldest_event = self.streams[stream_id][0]
|
|
self.event_index.pop(oldest_event.event_id, None)
|
|
|
|
# Add new event
|
|
self.streams[stream_id].append(event_entry)
|
|
self.event_index[event_id] = event_entry
|
|
|
|
return event_id
|
|
|
|
async def replay_events_after(
|
|
self,
|
|
last_event_id: EventId,
|
|
send_callback: EventCallback,
|
|
) -> StreamId | None:
|
|
"""Replays events that occurred after the specified event ID."""
|
|
if last_event_id not in self.event_index:
|
|
logger.warning(f"Event ID {last_event_id} not found in store")
|
|
return None
|
|
|
|
# Get the stream and find events after the last one
|
|
last_event = self.event_index[last_event_id]
|
|
stream_id = last_event.stream_id
|
|
stream_events = self.streams.get(last_event.stream_id, deque())
|
|
|
|
# Events in deque are already in chronological order
|
|
found_last = False
|
|
for event in stream_events:
|
|
if found_last:
|
|
await send_callback(EventMessage(event.message, event.event_id))
|
|
elif event.event_id == last_event_id:
|
|
found_last = True
|
|
|
|
return stream_id
|