Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/mcp/shared/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
INVALID_PARAMS,
REQUEST_TIMEOUT,
CancelledNotification,
CancelledNotificationParams,
ClientNotification,
ClientRequest,
ClientResult,
Expand Down Expand Up @@ -269,6 +270,7 @@ async def send_request(
# Store the callback for this request
self._progress_callbacks[request_id] = progress_callback

request_sent = False
try:
target = request_data.get("params", {}).get("name")
span_name = f"MCP send {request.method} {target}" if target else f"MCP send {request.method}"
Expand All @@ -284,6 +286,7 @@ async def send_request(

jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data)
await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata))
request_sent = True

# request read timeout takes precedence over session read timeout
timeout = request_read_timeout_seconds or self._session_read_timeout_seconds
Expand All @@ -301,6 +304,26 @@ async def send_request(
else:
return result_type.model_validate(response_or_error.result, by_name=False)

except anyio.get_cancelled_exc_class():
# Automatically notify the other side when a task/scope is cancelled,
# so the peer can abort work for a request nobody is waiting for.
if request_sent:
with anyio.CancelScope(shield=True):
try:
# Add a short timeout to prevent deadlock if the transport buffer is full
with anyio.move_on_after(2.0):
await self.send_notification(
CancelledNotification( # type: ignore[arg-type]
params=CancelledNotificationParams(
request_id=request_id,
reason="Task cancelled",
)
)
)
except Exception:
pass # Transport may already be closed
raise

finally:
self._response_streams.pop(request_id, None)
self._progress_callbacks.pop(request_id, None)
Expand Down
61 changes: 61 additions & 0 deletions tests/server/test_cancel_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,64 @@ async def run_server():
# Without the fixes: RuntimeError (dict mutation) or ClosedResourceError
# (respond after write-stream close) escapes run_server and this hangs.
await server_run_returned.wait()


@pytest.mark.anyio
async def test_anyio_cancel_scope_sends_cancelled_notification() -> None:
"""Cancelling a call_tool via anyio cancel scope should automatically
send notifications/cancelled to the server, causing it to abort the handler."""

tool_started = anyio.Event()
handler_cancelled = anyio.Event()

async def handle_list_tools(ctx: ServerRequestContext, params: PaginatedRequestParams | None) -> ListToolsResult:
return ListToolsResult(
tools=[
Tool(
name="slow_tool",
description="A slow tool for testing cancellation",
input_schema={},
)
]
)

async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
if params.name == "slow_tool":
tool_started.set()
try:
await anyio.sleep_forever()
except anyio.get_cancelled_exc_class():
handler_cancelled.set()
raise
raise ValueError(f"Unknown tool: {params.name}") # pragma: no cover

server = Server(
"test-server",
on_list_tools=handle_list_tools,
on_call_tool=handle_call_tool,
)

async with Client(server) as client:
# Cancel the call_tool via anyio scope cancellation.
# send_request should automatically send notifications/cancelled.
async with anyio.create_task_group() as tg:

async def do_call() -> None:
with anyio.CancelScope() as scope:
# Store scope so the outer task can cancel it
do_call.scope = scope # type: ignore[attr-defined]
await client.call_tool("slow_tool", {})

tg.start_soon(do_call)

# Wait for the server handler to start
await tool_started.wait()

# Cancel the client-side scope — this should trigger auto-notification
do_call.scope.cancel() # type: ignore[attr-defined]

# Give the server a moment to process the cancellation
await anyio.sleep(0.1)

# The server handler should have been cancelled via the notification
assert handler_cancelled.is_set(), "Server handler was not cancelled — notifications/cancelled was not sent"
Loading