Appearance
第11章 Python Client 实现剖析
前面几章我们分析了 TypeScript SDK 的 Client 实现,对 MCP 客户端的职责有了清晰认知。本章我们将目光转向 Python SDK 的客户端实现。Python SDK 在设计哲学上与 TypeScript SDK 保持一致——都围绕"会话"这一核心抽象展开,但在具体实现上却因语言特性而呈现出显著差异。Python SDK 选择了 anyio 作为异步运行时抽象层,用 Pydantic 取代 Zod 进行类型校验,用 async with 上下文管理器替代手动的生命周期管理。这些选择不是偶然的,它们深刻影响了客户端的 API 形态和内部架构。
11.1 客户端架构总览
Python MCP Client 的源码位于 src/mcp/client/ 目录下,核心文件包括:
session.py—ClientSession类,单服务器会话的核心抽象session_group.py—ClientSessionGroup类,多服务器聚合管理stdio.py— stdio 传输层实现sse.py— SSE 传输层实现streamable_http.py— Streamable HTTP 传输层实现auth/oauth2.py— OAuth2 客户端认证
下面这张架构图展示了各模块之间的关系:
11.2 ClientSession:单服务器会话
ClientSession 是 Python 客户端的核心类,它继承自 BaseSession,负责与单个 MCP Server 的全部通信。我们先看其类型签名:
python
class ClientSession(
BaseSession[
types.ClientRequest, # 发送的请求类型
types.ClientNotification, # 发送的通知类型
types.ClientResult, # 发送的结果类型
types.ServerRequest, # 接收的请求类型
types.ServerNotification, # 接收的通知类型
]
):这个五元泛型参数看起来复杂,但逻辑清晰:前三个约束"我发出去的消息",后两个约束"我收到的消息"。BaseSession 利用这些泛型参数在编译期保证消息类型的正确性。
11.2.1 构造与初始化
ClientSession 的构造函数接受读写流和一系列回调函数:
python
def __init__(
self,
read_stream: ReadStream[SessionMessage | Exception],
write_stream: WriteStream[SessionMessage],
read_timeout_seconds: float | None = None,
sampling_callback: SamplingFnT | None = None,
elicitation_callback: ElicitationFnT | None = None,
list_roots_callback: ListRootsFnT | None = None,
logging_callback: LoggingFnT | None = None,
message_handler: MessageHandlerFnT | None = None,
client_info: types.Implementation | None = None,
):这些回调函数的设计值得关注。Python SDK 使用 Protocol 类来定义回调类型,这是 Python 式的结构化子类型(structural subtyping),等价于 TypeScript 的接口但更灵活:
python
class SamplingFnT(Protocol):
async def __call__(
self,
context: RequestContext[ClientSession],
params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult | types.ErrorData: ...每个回调都有默认实现。例如,当服务器发起 sampling 请求但客户端未注册回调时,默认返回错误:
python
async def _default_sampling_callback(
context: RequestContext[ClientSession],
params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult | types.ErrorData:
return types.ErrorData(
code=types.INVALID_REQUEST,
message="Sampling not supported",
)初始化握手通过 initialize() 方法完成。该方法会根据注册的回调函数自动推断客户端能力——如果注册了 sampling_callback,就声明支持 sampling 能力;如果注册了 list_roots_callback,就声明支持 roots 能力。这种"注册即声明"的设计避免了能力声明与实际实现不一致的问题。
11.2.2 协议操作方法
ClientSession 为 MCP 协议的每种操作提供了对应的异步方法:
| 方法 | 协议操作 | 返回类型 |
|---|---|---|
list_tools() | tools/list | ListToolsResult |
call_tool() | tools/call | CallToolResult |
list_resources() | resources/list | ListResourcesResult |
read_resource() | resources/read | ReadResourceResult |
list_prompts() | prompts/list | ListPromptsResult |
get_prompt() | prompts/get | GetPromptResult |
complete() | completion/complete | CompleteResult |
send_ping() | ping | EmptyResult |
以 call_tool 为例,它展示了 Python 客户端的一个独特特性——结构化内容校验:
python
async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
read_timeout_seconds: float | None = None,
progress_callback: ProgressFnT | None = None,
) -> types.CallToolResult:
result = await self.send_request(
types.CallToolRequest(
params=types.CallToolRequestParams(name=name, arguments=arguments),
),
types.CallToolResult,
request_read_timeout_seconds=read_timeout_seconds,
progress_callback=progress_callback,
)
if not result.is_error:
await self._validate_tool_result(name, result)
return result_validate_tool_result 会根据工具的 output_schema 对返回的 structured_content 进行 JSON Schema 校验。工具的 output schema 在 list_tools() 调用时被缓存到 _tool_output_schemas 字典中,后续的 call_tool 会自动利用这个缓存。如果缓存中没有该工具的 schema(比如工具是后来动态添加的),会自动触发一次 list_tools() 刷新缓存。
11.2.3 服务器请求处理
MCP 是双向协议,服务器也会向客户端发起请求。_received_request 方法通过 Python 的 match/case 模式匹配来分发这些请求:
python
async def _received_request(self, responder):
ctx = RequestContext[ClientSession](
request_id=responder.request_id,
meta=responder.request_meta,
session=self,
)
match responder.request:
case types.CreateMessageRequest(params=params):
with responder:
response = await self._sampling_callback(ctx, params)
await responder.respond(response)
case types.ElicitRequest(params=params):
with responder:
response = await self._elicitation_callback(ctx, params)
await responder.respond(response)
case types.ListRootsRequest():
with responder:
response = await self._list_roots_callback(ctx)
await responder.respond(response)注意 with responder 上下文管理器的使用。RequestResponder 实现了 __enter__/__exit__,在进入时设置取消作用域(CancelScope),在退出时通知会话该请求已处理完毕。这确保了即使回调函数抛出异常,请求的生命周期也能被正确管理。
11.3 ClientSessionGroup:多服务器聚合
真实的 AI Agent 应用往往需要同时连接多个 MCP Server。ClientSessionGroup 正是为此设计的——它管理多个 ClientSession,并将所有服务器的 tools、resources、prompts 聚合到统一的命名空间中。
11.3.1 连接管理与生命周期
ClientSessionGroup 实现了 async with 协议,用 AsyncExitStack 管理所有子会话的生命周期:
python
class ClientSessionGroup:
def __init__(
self,
exit_stack: contextlib.AsyncExitStack | None = None,
component_name_hook: _ComponentNameHook | None = None,
):
self._tools = {}
self._resources = {}
self._prompts = {}
self._sessions = {}
self._tool_to_session = {}
if exit_stack is None:
self._exit_stack = contextlib.AsyncExitStack()
self._owns_exit_stack = True
else:
self._exit_stack = exit_stack
self._owns_exit_stack = False这里有一个精妙的设计:exit_stack 可以外部传入,也可以内部创建。如果外部传入,ClientSessionGroup 不负责关闭它(_owns_exit_stack = False)。这使得 ClientSessionGroup 可以嵌入到更大的资源管理体系中。
连接新服务器通过 connect_to_server 完成:
python
async with ClientSessionGroup() as group:
session_a = await group.connect_to_server(
StdioServerParameters(command="python", args=["-m", "my_server"])
)
session_b = await group.connect_to_server(
StreamableHttpParameters(url="http://localhost:8080/mcp")
)
# 此时 group.tools 包含两个服务器的所有工具
result = await group.call_tool("read_file", {"path": "/tmp/test.txt"})_establish_session 方法展示了传输层的选择逻辑——根据 ServerParameters 的具体类型(StdioServerParameters、SseServerParameters、StreamableHttpParameters)自动选择对应的传输层实现。每个会话都有独立的 AsyncExitStack,确保单个服务器断连不会影响其他会话。
11.3.2 命名冲突处理
当多个服务器提供同名工具时,ClientSessionGroup 默认会抛出错误。但它提供了 component_name_hook 机制来自定义命名策略:
python
name_fn = lambda name, server_info: f"{server_info.name}_{name}"
async with ClientSessionGroup(component_name_hook=name_fn) as group:
await group.connect_to_server(server_a_params) # tools: "serverA_read"
await group.connect_to_server(server_b_params) # tools: "serverB_read"聚合过程使用临时字典来保证原子性——如果聚合过程中任何一步失败,已有的聚合状态不会被污染:
python
async def _aggregate_components(self, server_info, session):
prompts_temp: dict[str, types.Prompt] = {}
resources_temp: dict[str, types.Resource] = {}
tools_temp: dict[str, types.Tool] = {}
# ... 收集所有组件到临时字典 ...
# 检查重复
matching_tools = tools_temp.keys() & self._tools.keys()
if matching_tools:
raise MCPError(...)
# 原子性地合并
self._tools.update(tools_temp)11.3.3 动态断连
disconnect_from_server 支持运行时动态移除某个服务器。它通过 _ComponentNames 反向索引快速定位该会话注册的所有组件,逐一清理:
python
async def disconnect_from_server(self, session):
component_names = self._sessions.pop(session)
for name in component_names.tools:
del self._tools[name]
del self._tool_to_session[name]
# 关闭该会话的 exit_stack
session_stack = self._session_exit_stacks.pop(session)
await session_stack.aclose()11.4 传输层:三种连接方式
Python SDK 提供了三种传输层实现,它们都遵循相同的模式——作为异步上下文管理器,yield 出 (read_stream, write_stream) 元组。
11.4.1 stdio 传输
stdio_client 通过 anyio.open_process 启动子进程,将子进程的 stdin/stdout 包装为 MCP 消息流。它在内部启动两个并发任务:stdout_reader 负责从子进程读取 JSON-RPC 消息并写入 read_stream,stdin_writer 负责从 write_stream 读取消息并写入子进程的 stdin。
关闭时遵循 MCP 规范的 graceful shutdown 序列:先关闭 stdin,等待进程自行退出;超时后发送 SIGTERM;仍未退出则发送 SIGKILL。
11.4.2 SSE 与 Streamable HTTP 传输
SSE 传输使用 httpx + httpx-sse 库建立长连接。Streamable HTTP 传输是 SSE 的演进版本,支持双向流式通信,并引入了 mcp-session-id 头来维护会话状态。两者都可以通过 OAuth2Auth 中间件进行认证。
11.4.3 传输参数模型
session_group.py 中定义了三种传输参数的 Pydantic 模型,统一为 ServerParameters 类型别名:
python
ServerParameters: TypeAlias = (
StdioServerParameters | SseServerParameters | StreamableHttpParameters
)这使得 connect_to_server 可以接受任意一种参数,内部通过 isinstance 分发到对应的传输层。
11.5 OAuth2 客户端认证
Python SDK 的 OAuth2 实现位于 client/auth/oauth2.py,支持 Authorization Code + PKCE 流程。PKCEParameters 类封装了 code_verifier/code_challenge 的生成:
python
class PKCEParameters(BaseModel):
code_verifier: str = Field(..., min_length=43, max_length=128)
code_challenge: str = Field(..., min_length=43, max_length=128)
@classmethod
def generate(cls) -> "PKCEParameters":
code_verifier = "".join(
secrets.choice(string.ascii_letters + string.digits + "-._~")
for _ in range(128)
)
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=")
return cls(code_verifier=code_verifier, code_challenge=code_challenge)认证流程通过 TokenStorage 协议实现 token 的持久化存储,应用可以自定义存储后端(文件、数据库、密钥链等)。OAuth2Auth 作为 httpx 的 Auth 中间件,自动在请求中注入 Bearer token 并处理 token 刷新。
11.6 与 TypeScript Client 的核心差异
通过对比两个 SDK 的客户端实现,可以提炼出以下关键差异:
| 维度 | Python SDK | TypeScript SDK |
|---|---|---|
| 异步运行时 | anyio(兼容 asyncio/trio) | 原生 async/await |
| 生命周期管理 | async with + AsyncExitStack | 手动 connect()/close() |
| 类型校验 | Pydantic TypeAdapter | Zod schema |
| 回调类型 | Protocol(结构化子类型) | TypeScript 函数类型 |
| 消息分发 | match/case 模式匹配 | switch/case |
| 多服务器 | 内置 ClientSessionGroup | 需自行实现 |
| 工具结果校验 | 内置 _validate_tool_result | 无内置校验 |
| 传输层切换 | isinstance + 上下文管理器 | Transport 接口 |
其中最值得关注的差异有三点:
第一,生命周期管理。 Python 的 async with 将资源获取与释放绑定在语法结构上,不可能忘记关闭连接。TypeScript 需要开发者自觉调用 close(),或使用 try/finally。AsyncExitStack 的嵌套使用更是 Python 独有的模式——ClientSessionGroup 用主 exit_stack 管理所有子会话的 exit_stack,形成资源管理的树状结构。
第二,多服务器聚合。 ClientSessionGroup 是 Python SDK 独有的抽象。它不仅管理连接,还提供了组件聚合、命名冲突检测、动态断连等高级功能。TypeScript SDK 没有对应的内置实现,开发者需要自行管理多个 Client 实例。
第三,工具结果校验。 Python 的 call_tool 在返回结果前会自动校验 structured_content 是否符合工具声明的 output_schema。这利用了 jsonschema 库进行运行时校验,为 Agent 应用提供了额外的安全保障。TypeScript 端没有这个内置机制。
11.7 本章小结
本章深入分析了 MCP Python SDK 的客户端实现。ClientSession 作为单服务器会话的核心抽象,继承自 BaseSession 并提供了完整的 MCP 协议操作方法;ClientSessionGroup 在此基础上实现了多服务器管理与组件聚合。Python SDK 充分利用了 anyio 的结构化并发、async with 上下文管理器、Pydantic 类型校验等语言特性,在保持与 TypeScript SDK 协议兼容性的同时,提供了更符合 Python 生态习惯的 API 设计。传输层的三种实现(stdio、SSE、Streamable HTTP)通过统一的读写流抽象与上下文管理器模式,实现了对应用层的完全透明。OAuth2 认证则作为可插拔的 httpx 中间件,为 HTTP 类传输提供了标准化的安全方案。