Appearance
第09章 TypeScript Client 实现剖析
"A protocol is only as good as its client implementation. The client is the bridge between the AI model and the world of tools."
本章要点
- 理解 Client 类如何继承 Protocol 基类,通过 connect() 完成初始化握手与能力协商
- 掌握 listTools、callTool、listResources、readResource 等核心 API 的实现细节
- 深入 OAuth 认证体系:AuthProvider 与 OAuthClientProvider 的双层设计
- 理解中间件链(Middleware Chain)的组合模式与实际应用
- 掌握 listChanged 通知处理器的防抖机制与自动刷新策略
- 理解 Transport 选择策略与重连逻辑
9.1 Client 类的整体架构
在前一章中,我们剖析了 Server 端的实现。现在让我们转向对称的另一面——Client 端。MCP TypeScript SDK 中的 Client 类位于 packages/client/src/client/client.ts,它是 AI 应用(如 Claude Desktop、Cursor)与 MCP Server 之间的桥梁。
Client 的核心职责可以归纳为三件事:
- 连接与握手——通过 Transport 连接 Server,完成
initialize/initialized握手,协商协议版本与能力 - 请求代理——将上层应用的调用(listTools、callTool 等)转化为 JSON-RPC 请求发送给 Server
- 事件处理——监听 Server 推送的通知(工具列表变更、资源变更等),驱动上层应用更新
构造函数:能力声明与验证器初始化
Client 的构造函数接收两个参数:客户端身份信息(Implementation)和可选配置(ClientOptions)。这里有几个值得注意的设计:
typescript
constructor(
private _clientInfo: Implementation,
options?: ClientOptions
) {
super({
...options,
tasks: extractTaskManagerOptions(options?.capabilities?.tasks)
});
this._capabilities = options?.capabilities
? { ...options.capabilities } : {};
this._jsonSchemaValidator = options?.jsonSchemaValidator
?? new DefaultJsonSchemaValidator();
this._enforceStrictCapabilities =
options?.enforceStrictCapabilities ?? false;
// Strip runtime-only fields from advertised capabilities
if (options?.capabilities?.tasks) {
const { taskStore, taskMessageQueue,
defaultTaskPollInterval, maxTaskQueueSize,
...wireCapabilities } = options.capabilities.tasks;
this._capabilities.tasks = wireCapabilities;
}
// Store list changed config for setup after connection
if (options?.listChanged) {
this._pendingListChangedConfig = options.listChanged;
}
}第一个关键决策:运行时字段的剥离。tasks 能力中包含 taskStore、taskMessageQueue 等运行时配置,这些不应该在初始化握手时发送给 Server。SDK 通过解构赋值优雅地分离了"线上能力"(advertised capabilities)和"运行时配置"(runtime options)。
第二个关键决策:listChanged 配置的延迟设置。用户在构造时传入的 listChanged 处理器不会立即注册,而是存储在 _pendingListChangedConfig 中。为什么?因为 listChanged 处理器需要根据 Server 的能力来决定是否启用——只有 Server 声明了 tools.listChanged: true,注册工具变更监听才有意义。这个设置会在 connect() 完成初始化握手后执行。
9.2 连接与初始化握手
connect() 方法是 Client 生命周期的起点。它覆写了 Protocol 基类的 connect(),在建立传输层连接之后,自动执行 MCP 初始化握手:
让我们逐步分析这个流程中的关键逻辑:
首次连接:完整的初始化握手
typescript
override async connect(transport: Transport,
options?: RequestOptions): Promise<void> {
await super.connect(transport);
// 重连检测:sessionId 已存在则跳过初始化
if (transport.sessionId !== undefined) {
if (this._negotiatedProtocolVersion !== undefined
&& transport.setProtocolVersion) {
transport.setProtocolVersion(
this._negotiatedProtocolVersion);
}
return;
}
try {
const result = await this._requestWithSchema(
{
method: 'initialize',
params: {
protocolVersion:
this._supportedProtocolVersions[0]
?? LATEST_PROTOCOL_VERSION,
capabilities: this._capabilities,
clientInfo: this._clientInfo
}
},
InitializeResultSchema,
options
);
// 协议版本校验
if (!this._supportedProtocolVersions
.includes(result.protocolVersion)) {
throw new Error(
`Server's protocol version is not supported: `
+ result.protocolVersion);
}
// 存储协商结果
this._serverCapabilities = result.capabilities;
this._serverVersion = result.serverInfo;
this._negotiatedProtocolVersion = result.protocolVersion;
this._instructions = result.instructions;
// 通知 Server 初始化完成
await this.notification({
method: 'notifications/initialized'
});
// 设置 listChanged 处理器
if (this._pendingListChangedConfig) {
this._setupListChangedHandlers(
this._pendingListChangedConfig);
this._pendingListChangedConfig = undefined;
}
} catch (error) {
void this.close();
throw error;
}
}这段代码有三个值得深思的设计:
协议版本协商的单向性。Client 发送自己支持的最高版本(_supportedProtocolVersions[0]),Server 返回它选择的版本。如果 Server 选择的版本不在 Client 的支持列表中,直接抛出错误。这是一个"Client 提议,Server 决策,Client 校验"的三步协商模式。
失败时的清理策略。catch 块中调用 this.close() 但使用 void 忽略其返回值。这确保了初始化失败时传输层被正确关闭,不会留下僵尸连接,同时避免 close() 自身的错误掩盖真正的初始化错误。
HTTP Transport 的协议版本传递。transport.setProtocolVersion 是一个可选方法,仅 HTTP 类传输(StreamableHTTP)实现。调用后,传输层会在每个后续请求的 HTTP 头中携带 mcp-protocol-version,确保 Server 端能正确路由请求。
重连:跳过握手的快速路径
重连逻辑非常精简:当 transport.sessionId 已存在时,说明这是对已有会话的重连。此时只需恢复协议版本号,不需要重新握手。这个设计依赖一个前提——Server 端通过 sessionId 维护了会话状态,包括之前协商的能力和协议版本。
9.3 核心 API:工具、资源与提示词
Client 提供了一组高度一致的 API 来访问 Server 的三大原语——Tools、Resources 和 Prompts。这些 API 共享一个统一的实现模式。
能力检查:宽松模式与严格模式
每个 API 在发送请求前都会检查 Server 是否声明了对应能力。这里有一个重要的设计决策——宽松模式(默认)和严格模式的选择:
typescript
async listTools(params?: ListToolsRequest['params'],
options?: RequestOptions) {
if (!this._serverCapabilities?.tools
&& !this._enforceStrictCapabilities) {
// 宽松模式:Server 未声明 tools 能力时返回空列表
return { tools: [] };
}
// 严格模式下,assertCapabilityForMethod 会抛出异常
const result = await this._requestWithSchema(
{ method: 'tools/list', params },
ListToolsResultSchema, options);
this.cacheToolMetadata(result.tools);
return result;
}listResources 和 listPrompts 遵循完全相同的模式。宽松模式的设计意图是提升容错性:一个 Client 可能同时连接多个 Server,部分 Server 不支持 tools 是正常的。返回空列表比抛出异常更友好。
而 readResource、callTool、getPrompt 这些"操作型"API 则总是严格检查能力,因为对不存在的能力发起操作本身就是逻辑错误。
callTool:结构化输出验证
callTool 是所有 API 中最复杂的,因为它涉及 outputSchema 验证和任务型工具的拦截:
typescript
async callTool(params: CallToolRequest['params'],
options?: RequestOptions) {
// 拦截 required-task 工具
if (this.isToolTaskRequired(params.name)) {
throw new ProtocolError(
ProtocolErrorCode.InvalidRequest,
`Tool "${params.name}" requires task-based execution.`
);
}
const result = await this._requestWithSchema(
{ method: 'tools/call', params },
CallToolResultSchema, options);
// 获取缓存的输出验证器
const validator = this.getToolOutputValidator(params.name);
if (validator) {
// 有 outputSchema 的工具必须返回 structuredContent
if (!result.structuredContent && !result.isError) {
throw new ProtocolError(
ProtocolErrorCode.InvalidRequest,
`Tool has output schema but no structured content`
);
}
if (result.structuredContent) {
const validationResult =
validator(result.structuredContent);
if (!validationResult.valid) {
throw new ProtocolError(
ProtocolErrorCode.InvalidParams,
`Structured content does not match schema`
);
}
}
}
return result;
}这个实现展现了三层防御:
任务拦截层:标记为
taskSupport: 'required'的工具不允许通过普通callTool调用,必须使用experimental.tasks.callToolStream()。这个检查基于listTools时缓存的元数据。存在性校验层:如果工具声明了
outputSchema,则响应中 必须 包含structuredContent(除非是错误响应)。这防止了 Server 实现的遗漏。Schema 验证层:使用预编译的 JSON Schema 验证器检查
structuredContent是否符合工具声明的输出格式。验证器在listTools时就已编译并缓存,避免了每次调用的重复编译开销。
工具元数据缓存
cacheToolMetadata 在每次 listTools 调用后执行,维护三个缓存:
typescript
private cacheToolMetadata(tools: Tool[]): void {
this._cachedToolOutputValidators.clear();
this._cachedKnownTaskTools.clear();
this._cachedRequiredTaskTools.clear();
for (const tool of tools) {
if (tool.outputSchema) {
const toolValidator =
this._jsonSchemaValidator
.getValidator(tool.outputSchema);
this._cachedToolOutputValidators
.set(tool.name, toolValidator);
}
const taskSupport = tool.execution?.taskSupport;
if (taskSupport === 'required'
|| taskSupport === 'optional') {
this._cachedKnownTaskTools.add(tool.name);
}
if (taskSupport === 'required') {
this._cachedRequiredTaskTools.add(tool.name);
}
}
}这是一个典型的空间换时间策略:将 outputSchema 的编译和 taskSupport 的查找从 callTool 的热路径中移到 listTools 的冷路径中。在实际使用中,listTools 通常只调用一次(或在列表变更时重新调用),而 callTool 会被频繁调用。
资源订阅
资源相关的 API 包括列表查询、内容读取和变更订阅三个层次:
typescript
// 列出所有资源(宽松模式,分页支持)
async listResources(params?, options?) { ... }
// 列出资源 URI 模板
async listResourceTemplates(params?, options?) { ... }
// 读取资源内容
async readResource(params, options?) { ... }
// 订阅资源变更通知(需要 Server 声明 subscribe 能力)
async subscribeResource(params, options?) { ... }
// 取消订阅
async unsubscribeResource(params, options?) { ... }subscribeResource 有一个额外的能力检查——不仅要求 Server 支持 resources,还要求 resources.subscribe 为 true。这是因为资源订阅是资源能力的一个子特性,并非所有支持资源的 Server 都实现了变更通知。
9.4 listChanged 通知:防抖与自动刷新
当 Server 端的工具列表、资源列表或提示词列表发生变更时,它会发送 notifications/tools/list_changed 等通知。Client 需要优雅地处理这些通知,既要及时更新,又不能在高频变更时产生过多请求。
防抖机制的实现
SDK 对每种列表类型(tools、prompts、resources)维护独立的防抖定时器:
typescript
private _setupListChangedHandler<T>(
listType: string,
notificationMethod: NotificationMethod,
options: ListChangedOptions<T>,
fetcher: () => Promise<T[]>
): void {
const { autoRefresh, debounceMs } = parseResult.data;
const { onChanged } = options;
const refresh = async () => {
if (!autoRefresh) {
onChanged(null, null);
return;
}
try {
const items = await fetcher();
onChanged(null, items);
} catch (error) {
const newError = error instanceof Error
? error : new Error(String(error));
onChanged(newError, null);
}
};
const handler = () => {
if (debounceMs) {
const existingTimer =
this._listChangedDebounceTimers.get(listType);
if (existingTimer) {
clearTimeout(existingTimer);
}
const timer = setTimeout(refresh, debounceMs);
this._listChangedDebounceTimers
.set(listType, timer);
} else {
refresh();
}
};
this.setNotificationHandler(notificationMethod, handler);
}这里的 onChanged 回调采用了 Node.js 风格的 error-first 回调模式:onChanged(error, items)。当 autoRefresh 为 false 时,仅通知上层应用"列表已变更"(error 和 items 都为 null),由应用自行决定何时刷新。当 autoRefresh 为 true 时,SDK 自动调用对应的 listTools() / listResources() / listPrompts() 获取最新数据。
使用方式非常直观:
typescript
const client = new Client(
{ name: 'my-client', version: '1.0.0' },
{
listChanged: {
tools: {
autoRefresh: true,
debounceMs: 200,
onChanged: (error, tools) => {
if (error) {
console.error('刷新工具列表失败:', error);
return;
}
console.log('工具列表已更新:', tools);
}
}
}
}
);9.5 Transport 选择与连接策略
Client 本身不绑定任何特定的传输层实现。它通过 connect(transport) 接受一个 Transport 接口的实例,由上层应用决定使用哪种传输方式。SDK 提供了三种内置传输:
| 传输类型 | 类名 | 适用场景 |
|---|---|---|
| Stdio | StdioClientTransport | 本地子进程,开发调试 |
| Streamable HTTP | StreamableHTTPClientTransport | 远程服务,生产部署 |
| SSE (旧版) | SSEClientTransport | 兼容旧版 Server |
SDK 官方推荐的连接策略是先尝试 Streamable HTTP,失败后降级到 SSE:
typescript
async function connectWithFallback(url: string) {
const baseUrl = new URL(url);
try {
// 优先使用现代 Streamable HTTP 传输
const client = new Client({
name: 'my-client', version: '1.0.0' });
const transport =
new StreamableHTTPClientTransport(baseUrl);
await client.connect(transport);
return { client, transport };
} catch {
// 降级到旧版 SSE 传输
const client = new Client({
name: 'my-client', version: '1.0.0' });
const transport = new SSEClientTransport(baseUrl);
await client.connect(transport);
return { client, transport };
}
}注意这里的一个微妙点:降级时需要创建新的 Client 实例,而不是复用旧的。这是因为 connect() 失败时会调用 this.close(),Client 的内部状态已经被清理,不适合重用。
重连逻辑的设计意图
回顾 connect() 中的重连检测:
typescript
if (transport.sessionId !== undefined) {
if (this._negotiatedProtocolVersion !== undefined
&& transport.setProtocolVersion) {
transport.setProtocolVersion(
this._negotiatedProtocolVersion);
}
return;
}这段代码揭示了 MCP 的重连哲学:会话恢复,而非重新建立。当 HTTP 传输因网络中断后恢复时,传输层会携带之前的 sessionId。Client 检测到这一点后,只需将协议版本号同步到新的传输层实例,即可恢复通信。Server 端通过 sessionId 识别出这是同一个客户端,保留之前的上下文。
9.6 请求处理器:Server 发起的请求
MCP 协议不是单向的——Server 也可以向 Client 发起请求。Client 通过 setRequestHandler 注册处理器来响应这些反向请求:
typescript
// 处理 Sampling 请求(Server 请求 Client 调用 LLM)
client.setRequestHandler(
'sampling/createMessage',
async (request) => {
return {
model: 'claude-3-opus',
role: 'assistant',
content: {
type: 'text',
text: 'Response from the model'
}
};
}
);
// 处理 Elicitation 请求(Server 请求用户输入)
client.setRequestHandler(
'elicitation/create',
async (request) => {
return {
action: 'accept',
content: { name: 'user-input-value' }
};
}
);setRequestHandler 覆写了基类的同名方法,对 sampling/createMessage 和 elicitation/create 两个方法添加了额外的验证包装:
这个包装层做了三件事:
- 入参验证——用 Zod Schema 验证 Server 发来的请求格式
- 能力检查——对 elicitation 请求,检查 Client 是否声明了对应的模式支持(form/url)
- 出参验证——对用户 handler 返回的结果进行 Schema 验证,防止返回不合规的响应
特别值得注意的是 Elicitation 的 applyDefaults 机制。当 Client 声明了 elicitation.form.applyDefaults: true,且用户接受了 elicitation 请求时,SDK 会自动将 Schema 中声明的默认值填充到用户提交的数据中:
typescript
if (validatedResult.action === 'accept'
&& validatedResult.content
&& requestedSchema
&& this._capabilities.elicitation?.form?.applyDefaults) {
applyElicitationDefaults(
requestedSchema, validatedResult.content);
}这个自动填充逻辑递归处理嵌套对象和 anyOf / oneOf 组合 Schema,是一个贴心的开发者体验优化。
9.7 OAuth 认证体系
MCP 的认证设计采用了 双层抽象 的模式,兼顾简单场景和复杂场景。
AuthProvider:最小认证接口
最简单的认证只需要提供一个 token:
typescript
interface AuthProvider {
token(): Promise<string | undefined>;
onUnauthorized?(ctx: UnauthorizedContext): Promise<void>;
}这个接口足以覆盖 API Key、Gateway Token 等场景:
typescript
const authProvider: AuthProvider = {
token: async () => process.env.API_KEY
};OAuthClientProvider:完整 OAuth 流程
对于需要用户授权的场景,SDK 提供了完整的 OAuthClientProvider 接口,涵盖 OAuth 2.1 的完整生命周期:
auth() 函数是整个 OAuth 流程的编排器。它的错误恢复策略值得研究——外层 auth() 捕获特定的 OAuth 错误码并进行凭据失效后重试:
typescript
export async function auth(provider, options): Promise<AuthResult> {
try {
return await authInternal(provider, options);
} catch (error) {
if (error instanceof OAuthError) {
if (error.code === OAuthErrorCode.InvalidClient
|| error.code === OAuthErrorCode.UnauthorizedClient) {
// 客户端凭据无效,清除所有凭据后重试
await provider.invalidateCredentials?.('all');
return await authInternal(provider, options);
} else if (error.code === OAuthErrorCode.InvalidGrant) {
// 授权无效(如 refresh token 过期),
// 仅清除 tokens 后重试
await provider.invalidateCredentials?.('tokens');
return await authInternal(provider, options);
}
}
throw error;
}
}这个设计的精妙之处在于分级失效:InvalidClient 清除一切(凭据可能已被吊销),InvalidGrant 只清除 tokens(客户端本身仍然有效,只是需要重新授权)。
客户端认证方法选择
SDK 实现了 OAuth 2.1 的三种客户端认证方法,并通过 selectClientAuthMethod 自动选择最佳方案:
typescript
export function selectClientAuthMethod(
clientInformation: OAuthClientInformationMixed,
supportedMethods: string[]
): ClientAuthMethod {
const hasClientSecret =
clientInformation.client_secret !== undefined;
// 优先使用 DCR 返回的方法
if ('token_endpoint_auth_method' in clientInformation
&& isClientAuthMethod(
clientInformation.token_endpoint_auth_method)
&& (supportedMethods.length === 0
|| supportedMethods.includes(
clientInformation.token_endpoint_auth_method))) {
return clientInformation.token_endpoint_auth_method;
}
// 按安全级别降序选择
// client_secret_basic > client_secret_post > none
if (hasClientSecret
&& supportedMethods.includes('client_secret_basic')) {
return 'client_secret_basic';
}
if (hasClientSecret
&& supportedMethods.includes('client_secret_post')) {
return 'client_secret_post';
}
if (supportedMethods.includes('none')) {
return 'none';
}
return hasClientSecret ? 'client_secret_post' : 'none';
}选择优先级:DCR 指定方法 > client_secret_basic > client_secret_post > none。这确保了在安全性和兼容性之间取得最佳平衡。
双层适配
Transport 层只需要 AuthProvider 的最小接口,但用户可能传入完整的 OAuthClientProvider。SDK 通过 adaptOAuthProvider 进行适配:
typescript
export function adaptOAuthProvider(
provider: OAuthClientProvider
): AuthProvider {
return {
token: async () => {
const tokens = await provider.tokens();
return tokens?.access_token;
},
onUnauthorized: async ctx =>
handleOAuthUnauthorized(provider, ctx)
};
}isOAuthClientProvider 类型守卫用于在运行时区分两种 Provider:
typescript
export function isOAuthClientProvider(
provider: AuthProvider | OAuthClientProvider | undefined
): provider is OAuthClientProvider {
const p = provider as OAuthClientProvider;
return typeof p.tokens === 'function'
&& typeof p.clientInformation === 'function';
}9.8 中间件链:可组合的 Fetch 增强
middleware.ts 提供了一个优雅的中间件系统,用于增强 fetch 函数的行为。这个设计直接借鉴了 Express/Koa 的中间件模式。
中间件类型定义
typescript
type Middleware = (next: FetchLike) => FetchLike;每个中间件接收一个 next 函数(下一层的 fetch),返回一个增强后的 fetch。多个中间件通过 applyMiddlewares 组合:
typescript
export const applyMiddlewares =
(...middleware: Middleware[]): Middleware => {
return next => {
let handler = next;
for (const mw of middleware) {
handler = mw(handler);
}
return handler;
};
};内置中间件
withOAuth——自动添加 Authorization 头,处理 401 响应:
typescript
const withOAuth = (provider, baseUrl?): Middleware =>
next => async (input, init) => {
const makeRequest = async () => {
const headers = new Headers(init?.headers);
const tokens = await provider.tokens();
if (tokens) {
headers.set('Authorization',
`Bearer ${tokens.access_token}`);
}
return next(input, { ...init, headers });
};
let response = await makeRequest();
if (response.status === 401) {
// 尝试重新认证
const result = await auth(provider, { ... });
if (result === 'AUTHORIZED') {
response = await makeRequest(); // 重试
}
}
return response;
};withLogging——请求日志记录,支持自定义日志函数和状态级别过滤:
typescript
const enhancedFetch = applyMiddlewares(
withOAuth(oauthProvider, 'https://api.example.com'),
withLogging({ statusLevel: 400 })
)(fetch);
// 使用增强后的 fetch
const response = await enhancedFetch('https://api.example.com/data');createMiddleware——简化自定义中间件的创建:
typescript
const customAuth = createMiddleware(
async (next, input, init) => {
const headers = new Headers(init?.headers);
headers.set('X-Custom-Auth', 'my-token');
return next(input, { ...init, headers });
}
);中间件系统虽然不直接被 MCP Transport 使用(Transport 有内置的 OAuth 处理),但它为使用 MCP OAuth 基础设施进行通用 HTTP 请求提供了便利。例如,一个应用可能需要用 MCP Server 签发的 token 去访问其他 API 端点——这时中间件链就派上了用场。
9.9 能力断言体系
Client 实现了一套完整的能力断言机制,确保请求不会发送到不支持对应功能的 Server:
typescript
protected assertCapabilityForMethod(method: RequestMethod): void {
switch (method as ClientRequest['method']) {
case 'tools/call':
case 'tools/list':
if (!this._serverCapabilities?.tools) {
throw new SdkError(
SdkErrorCode.CapabilityNotSupported,
`Server does not support tools`);
}
break;
case 'resources/subscribe':
if (!this._serverCapabilities?.resources) {
throw new SdkError(...);
}
// 订阅需要额外检查 subscribe 子能力
if (!this._serverCapabilities.resources.subscribe) {
throw new SdkError(...);
}
break;
// ... 其他方法
}
}能力断言分为三个维度:
| 断言方法 | 检查方向 | 用途 |
|---|---|---|
assertCapabilityForMethod | Client -> Server | 确保 Server 支持该请求 |
assertNotificationCapability | Client -> Server | 确保 Client 有权发送该通知 |
assertRequestHandlerCapability | Server -> Client | 确保 Client 声明了处理该请求的能力 |
这三维断言形成了一个双向能力检查网:Client 不会向不支持的 Server 发请求,也不会在没有声明能力的情况下处理 Server 的反向请求。
9.10 小结
本章从源码层面剖析了 MCP TypeScript Client 的完整实现。让我们回顾核心设计决策:
继承与组合的平衡。Client 通过继承 Protocol 获得 JSON-RPC 通信能力,通过组合 Transport 获得传输灵活性,通过组合 JsonSchemaValidator 获得输出验证能力。这种混合策略在保持代码复用的同时,避免了深层继承的脆弱性。
宽松与严格的弹性。enforceStrictCapabilities 参数让 Client 可以根据部署场景在容错性和安全性之间选择。默认的宽松模式适合多 Server 环境,严格模式适合受控的生产环境。
缓存与延迟初始化。工具元数据在 listTools 时预编译并缓存,listChanged 处理器在初始化握手完成后才注册。这些延迟策略确保了信息在正确的时机可用。
双层认证抽象。简单场景用 AuthProvider(一个 token() 方法),复杂场景用 OAuthClientProvider(完整 OAuth 流程)。通过适配器模式统一两种接口,Transport 层无需关心认证的具体实现。
理解了 Client 的内部机制,我们就能更好地在实际项目中使用它——无论是构建 AI Agent、IDE 插件还是其他需要与 MCP Server 交互的应用。在后续章节中,我们将看到 Python SDK 中相似但不同的 Client 实现,以及各种传输层的具体实现细节。