Appearance
第13章 Streamable HTTP:远程流式传输
在第12章中,我们分析了 stdio 传输——一种简单而高效的本地通信方式。但现实世界中的 AI 系统,从多人协作的开发环境到云端部署的 Agent 平台,都需要跨网络的远程通信能力。这就是 Streamable HTTP 传输层要解决的核心问题。
MCP 协议在早期版本中使用的是 SSE+HTTP POST 的组合方案:客户端通过一个长连接的 SSE 通道接收服务端消息,再通过独立的 POST 请求发送消息。这种设计虽然可行,但存在明显的架构缺陷——两个独立的 HTTP 连接需要通过额外的机制(如 URL 中嵌入的 endpoint 路径)进行关联,增加了实现复杂度,也限制了部署灵活性。Streamable HTTP 正是为了取代这一方案而设计的。
13.1 设计动机:为什么需要 Streamable HTTP
要理解 Streamable HTTP 的设计选择,我们需要先看清它面临的约束条件。
第一个约束是 HTTP 的半双工特性。 与 WebSocket 这样的全双工协议不同,HTTP 本质上是请求-响应模式——客户端发起请求,服务端返回响应。在一次 HTTP 事务中,通信方向是固定的。这意味着服务端无法在没有客户端请求的情况下主动推送消息。
第二个约束是基础设施兼容性。 现实中的网络基础设施——代理服务器、负载均衡器、CDN、WAF——都是为 HTTP 设计的。WebSocket 虽然是全双工的,但并非所有中间件都能正确处理它。Streamable HTTP 选择在标准 HTTP 语义之上构建,最大程度地利用现有基础设施。
第三个约束是流式传输的需求。 MCP 的许多操作天然是流式的——工具调用可能需要数分钟才能完成,期间需要向客户端推送进度更新。普通的 HTTP 请求-响应无法满足这一需求。
Streamable HTTP 的核心设计思路可以用一句话概括:用 POST 请求承载客户端到服务端的消息,用 SSE(Server-Sent Events)流承载服务端到客户端的流式响应。 这不是两个独立的连接,而是将 SSE 嵌入到 POST 请求的响应体中。
13.2 协议机制:三种 HTTP 方法的分工
Streamable HTTP 使用同一个 URL endpoint,通过三种 HTTP 方法实现不同的通信语义。
POST:客户端到服务端的消息通道
客户端通过 POST 请求发送 JSON-RPC 消息。请求体是标准的 JSON-RPC 消息(或批量消息数组),Content-Type 为 application/json。关键的设计点在于 Accept 头——客户端必须同时声明接受 application/json 和 text/event-stream,因为服务端可能以任一格式响应。
服务端对 POST 请求有三种可能的响应方式:
- 202 Accepted:当客户端发送的是通知(notification)或响应(response)时,服务端无需返回内容,直接返回 202。
- 200 OK + JSON:当服务端选择非流式模式时,直接返回 JSON-RPC 响应。
- 200 OK + SSE:当服务端选择流式模式时,返回一个 SSE 流,在流中逐步推送消息。
来看 TypeScript 客户端中 _send 方法的核心逻辑(源码位于 packages/client/src/client/streamableHttp.ts):
typescript
// 设置请求头——同时声明接受 JSON 和 SSE
headers.set('content-type', 'application/json');
headers.set('accept', [...new Set(types)].join(', '));
// types 包含 'application/json' 和 'text/event-stream'
const response = await (this._fetch ?? fetch)(this._url, init);
// 从响应头中提取 session ID
const sessionId = response.headers.get('mcp-session-id');
if (sessionId) {
this._sessionId = sessionId;
}
// 202 表示服务端已接收但无需返回内容
if (response.status === 202) {
// 如果是 initialized 通知,开启 GET SSE 流
if (isInitializedNotification(message)) {
this._startOrAuthSse({ resumptionToken: undefined })
.catch(error => this.onerror?.(error));
}
return;
}
// 根据响应的 Content-Type 决定处理方式
const contentType = response.headers.get('content-type');
if (contentType?.includes('text/event-stream')) {
// 流式响应——交给 SSE 处理器
this._handleSseStream(response.body, { onresumptiontoken }, false);
} else if (contentType?.includes('application/json')) {
// 非流式响应——直接解析 JSON
const data = await response.json();
const responseMessages = Array.isArray(data)
? data.map(msg => JSONRPCMessageSchema.parse(msg))
: [JSONRPCMessageSchema.parse(data)];
for (const msg of responseMessages) {
this.onmessage?.(msg);
}
}这段代码揭示了 Streamable HTTP 的一个重要设计决策:客户端不预设服务端的响应格式。同一个请求,服务端可以根据自身能力和当前场景选择 JSON 或 SSE 响应。这种灵活性使得简单的服务端可以只实现 JSON 响应,而复杂的服务端可以利用 SSE 进行流式传输。
GET:服务端主动推送通道
GET 请求用于建立一个独立的 SSE 长连接,使服务端能够在没有客户端请求的情况下主动推送消息——例如服务端发起的请求(server-initiated requests)和通知。这个通道是可选的:如果服务端返回 405 Method Not Allowed,客户端会静默忽略,不报错。
服务端侧的实现限制了每个 session 只能有一个 GET 流。来看服务端的处理逻辑(源码位于 packages/server/src/server/streamableHttp.ts):
typescript
// 检查是否已有活跃的 GET 流
if (this._streamMapping.get(this._standaloneSseStreamId) !== undefined) {
return this.createJsonErrorResponse(
409, -32_000,
'Conflict: Only one SSE stream is allowed per session'
);
}DELETE:会话终止
客户端通过 DELETE 请求显式终止会话。服务端可以返回 200 OK 确认终止,也可以返回 405 Method Not Allowed 表示不支持客户端主动终止会话。
13.3 会话管理:有状态与无状态的权衡
Streamable HTTP 支持两种运行模式:有状态(stateful)和无状态(stateless),通过服务端是否提供 sessionIdGenerator 来决定。
在有状态模式下,服务端在处理 initialize 请求时生成一个 session ID,通过 mcp-session-id 响应头返回给客户端。此后,客户端必须在每个请求中携带这个 session ID。来看服务端的会话验证逻辑:
typescript
private validateSession(req: Request): Response | undefined {
if (this.sessionIdGenerator === undefined) {
// 无状态模式,跳过验证
return undefined;
}
if (!this._initialized) {
return this.createJsonErrorResponse(
400, -32_000, 'Bad Request: Server not initialized'
);
}
const sessionId = req.headers.get('mcp-session-id');
if (!sessionId) {
// 缺少 session ID,返回 400
return this.createJsonErrorResponse(
400, -32_000, 'Bad Request: Mcp-Session-Id header is required'
);
}
if (sessionId !== this.sessionId) {
// session ID 不匹配,返回 404
return this.createJsonErrorResponse(404, -32_001, 'Session not found');
}
return undefined;
}这里有一个重要的设计细节:无效的 session ID 返回 404 Not Found,而非 403 Forbidden。这是因为对于客户端来说,无效的 session 等同于不存在的资源——客户端应该重新初始化,而不是尝试修复认证。
Python 实现中(源码位于 src/mcp/server/streamable_http.py),session ID 还有格式验证的约束:
python
# 只允许可见 ASCII 字符(0x21-0x7E)
SESSION_ID_PATTERN = re.compile(r"^[\x21-\x7E]+$")
def __init__(self, mcp_session_id: str | None, ...):
if mcp_session_id is not None and not SESSION_ID_PATTERN.fullmatch(mcp_session_id):
raise ValueError(
"Session ID must only contain visible ASCII characters (0x21-0x7E)"
)无状态模式适用于简单场景或 serverless 部署——每个请求独立处理,不需要在服务端维护状态。有状态模式则适用于需要上下文延续的场景,例如多轮对话中的工具调用。
13.4 SSE 流处理:priming event 与断线重连
SSE(Server-Sent Events)是 Streamable HTTP 中实现流式传输的核心机制。与 WebSocket 不同,SSE 是单向的(服务端到客户端),但正好契合 MCP 的需求——在一个 POST 请求的响应中,服务端需要向客户端推送多条消息。
Priming Event:建立可恢复性的基石
当服务端配置了 EventStore(支持可恢复性)时,SSE 流的第一个事件是一个特殊的"priming event"——它只包含一个事件 ID 和空数据,不携带任何 JSON-RPC 消息。其目的是让客户端尽早获得一个可用于断线恢复的 event ID。
typescript
private async writePrimingEvent(
controller: ReadableStreamDefaultController<Uint8Array>,
encoder: InstanceType<typeof TextEncoder>,
streamId: string,
protocolVersion: string
): Promise<void> {
if (!this._eventStore) return;
// 旧版客户端不支持空数据的 SSE 事件,会尝试将空字符串解析为 JSON 而崩溃
if (protocolVersion < '2025-11-25') return;
const primingEventId = await this._eventStore.storeEvent(
streamId, {} as JSONRPCMessage
);
let primingEvent = `id: ${primingEventId}\ndata: \n\n`;
if (this._retryInterval !== undefined) {
primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`;
}
controller.enqueue(encoder.encode(primingEvent));
}注意版本检查 protocolVersion < '2025-11-25' 这行代码——这是一个向后兼容的防御措施。早期版本的客户端会将每个 SSE 事件的 data 字段当作 JSON 解析,空数据会导致解析失败。只有 2025-11-25 及之后版本的协议才正确处理了这一点。
断线重连机制
网络连接不可能永远稳定。Streamable HTTP 提供了一套完整的断线重连机制,包括指数退避和服务端控制的重试间隔。
客户端的重连核心逻辑体现在 _handleSseStream 和 _scheduleReconnection 方法中:
typescript
private _handleSseStream(
stream: ReadableStream<Uint8Array> | null,
options: StartSSEOptions,
isReconnectable: boolean
): void {
let lastEventId: string | undefined;
let hasPrimingEvent = false; // 是否收到过带 ID 的事件
let receivedResponse = false; // 是否已收到最终响应
const processStream = async () => {
try {
const reader = stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream({
onRetry: (retryMs: number) => {
// 捕获服务端指定的重试间隔
this._serverRetryMs = retryMs;
}
}))
.getReader();
while (true) {
const { value: event, done } = await reader.read();
if (done) break;
if (event.id) {
lastEventId = event.id;
hasPrimingEvent = true;
onresumptiontoken?.(event.id);
}
// 跳过空数据事件(priming event、keep-alive)
if (!event.data) continue;
// 解析并分发 JSON-RPC 消息
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
receivedResponse = true;
}
this.onmessage?.(message);
}
// 流正常关闭后,判断是否需要重连
const canResume = isReconnectable || hasPrimingEvent;
const needsReconnect = canResume && !receivedResponse;
if (needsReconnect && !this._abortController?.signal.aborted) {
this._scheduleReconnection({ resumptionToken: lastEventId, ... }, 0);
}
} catch (error) {
// 网络断开等异常,同样尝试重连
// ...
}
};
processStream();
}这段代码中有两个关键的布尔变量:
hasPrimingEvent:标记是否收到过带 event ID 的事件。只有收到过 priming event,POST 请求的 SSE 流才被视为可恢复的。receivedResponse:标记是否已收到最终的 JSON-RPC 响应。如果已经收到了完整响应,即使流断开也不需要重连——请求已经完成了。
重连的退避策略由 _getNextReconnectionDelay 计算:
typescript
private _getNextReconnectionDelay(attempt: number): number {
// 优先使用服务端指定的重试间隔
if (this._serverRetryMs !== undefined) {
return this._serverRetryMs;
}
// 否则使用指数退避
const initialDelay = this._reconnectionOptions.initialReconnectionDelay; // 默认 1s
const growFactor = this._reconnectionOptions.reconnectionDelayGrowFactor; // 默认 1.5
const maxDelay = this._reconnectionOptions.maxReconnectionDelay; // 默认 30s
return Math.min(initialDelay * Math.pow(growFactor, attempt), maxDelay);
}默认配置下,重试间隔为 1s、1.5s、2.25s...,上限 30s,最多重试 2 次。服务端可以通过 SSE 的 retry 字段覆盖这个间隔,从而实现服务端对客户端轮询频率的控制。
13.5 EventStore:事件持久化与重放
断线重连的另一半在服务端——当客户端带着 Last-Event-ID 头重新连接时,服务端需要能够重放在断线期间产生的事件。这就是 EventStore 的作用。
EventStore 是一个接口,定义了事件存储和重放的契约:
typescript
// TypeScript 接口(packages/server/src/server/streamableHttp.ts)
export interface EventStore {
storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise<EventId>;
getStreamIdForEventId?(eventId: EventId): Promise<StreamId | undefined>;
replayEventsAfter(
lastEventId: EventId,
{ send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }
): Promise<StreamId>;
}python
# Python 抽象类(src/mcp/server/streamable_http.py)
class EventStore(ABC):
@abstractmethod
async def store_event(
self, stream_id: StreamId, message: JSONRPCMessage | None
) -> EventId: ...
@abstractmethod
async def replay_events_after(
self, last_event_id: EventId, send_callback: EventCallback
) -> StreamId | None: ...设计上有几个值得注意的点:
第一,EventStore 是可选的。 不配置 EventStore 的服务端不支持断线恢复,客户端断线后只能重新初始化。这降低了简单部署场景的实现复杂度。
第二,getStreamIdForEventId 方法是可选的。 TypeScript 中用 ? 标记,Python 中则不要求实现。如果提供了这个方法,服务端可以在重放前检查 event ID 的合法性并进行冲突检测;如果没有提供,服务端依赖 replayEventsAfter 返回的 stream ID 进行映射。
第三,事件存储与流传输解耦。 在 Python 实现的 message_router 中,无论客户端是否在线,消息都会被存入 EventStore:
python
# 无论客户端是否连接,都存储事件
event_id = None
if self._event_store:
event_id = await self._event_store.store_event(request_stream_id, message)
if request_stream_id in self._request_streams:
await self._request_streams[request_stream_id][0].send(
EventMessage(message, event_id)
)
else:
# 客户端未连接,但事件已存储,重连时可重放
logger.debug("Request stream not found, client might reconnect and replay.")这种设计意味着即使客户端在服务端处理请求的过程中完全断开,所有中间事件都不会丢失。客户端重连后,通过 Last-Event-ID 头可以获取断线期间的所有事件。
服务端处理重放请求的流程如下(TypeScript 实现):
typescript
private async replayEvents(lastEventId: string): Promise<Response> {
// 1. 如果提供了 getStreamIdForEventId,先验证 event ID 合法性
if (this._eventStore.getStreamIdForEventId) {
const streamId = await this._eventStore.getStreamIdForEventId(lastEventId);
if (!streamId) {
return this.createJsonErrorResponse(400, -32_000, 'Invalid event ID format');
}
// 检查该 stream 是否已有活跃连接
if (this._streamMapping.get(streamId) !== undefined) {
return this.createJsonErrorResponse(409, -32_000,
'Conflict: Stream already has an active connection');
}
}
// 2. 创建新的 SSE 流
const readable = new ReadableStream<Uint8Array>({ ... });
// 3. 调用 EventStore 重放历史事件
const replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, {
send: async (eventId, message) => {
this.writeSSEEvent(streamController, encoder, message, eventId);
}
});
// 4. 注册新的 stream mapping,继续接收后续事件
this._streamMapping.set(replayedStreamId, { controller, encoder, cleanup: ... });
return new Response(readable, { headers });
}13.6 认证与 401 重试
远程传输必然涉及认证。Streamable HTTP 在传输层集成了 OAuth 认证流程,其核心设计是透明的 401 重试机制——当请求收到 401 Unauthorized 响应时,传输层会自动尝试重新认证并重发请求,对上层协议完全透明。
客户端的 _send 方法通过 isAuthRetry 参数实现了"最多重试一次"的语义:
typescript
private async _send(
message: JSONRPCMessage | JSONRPCMessage[],
options: ... | undefined,
isAuthRetry: boolean // 是否为认证重试
): Promise<void> {
// ...
if (response.status === 401 && this._authProvider) {
if (response.headers.has('www-authenticate')) {
const { resourceMetadataUrl, scope } = extractWWWAuthenticateParams(response);
this._resourceMetadataUrl = resourceMetadataUrl;
this._scope = scope;
}
if (this._authProvider.onUnauthorized && !isAuthRetry) {
// 第一次 401:调用 onUnauthorized 刷新凭证,然后重试
await this._authProvider.onUnauthorized({ response, serverUrl: this._url, ... });
return this._send(message, options, true); // 标记为重试
}
if (isAuthRetry) {
// 重试后仍然 401,抛出错误
throw new SdkError(SdkErrorCode.ClientHttpAuthentication,
'Server returned 401 after re-authentication', { status: 401 });
}
throw new UnauthorizedError();
}
// ...
}同样的机制也应用在 GET SSE 流的建立过程中(_startOrAuthSse 方法),确保了 GET 流也能在认证过期时自动恢复。
此外还有一个 403 Forbidden 的处理——当服务端返回 insufficient_scope 错误时,客户端会提取新的 scope 信息,尝试用更高权限重新认证。为防止无限循环,代码通过 _lastUpscopingHeader 追踪上一次尝试的 WWW-Authenticate 头,如果两次相同就终止重试:
typescript
if (this._lastUpscopingHeader === wwwAuthHeader) {
throw new SdkError(SdkErrorCode.ClientHttpForbidden,
'Server returned 403 after trying upscoping', { status: 403 });
}13.7 服务端架构:流映射与消息路由
服务端的实现需要解决一个核心的架构问题:如何将多个并发请求的响应路由到正确的 SSE 流?
TypeScript 服务端使用两层映射来解决这个问题:
_requestToStreamMapping: Map<RequestId, string>——将每个请求 ID 映射到一个 stream ID。_streamMapping: Map<string, StreamMapping>——将 stream ID 映射到实际的 SSE 流控制器。
当服务端需要发送消息时,send 方法的逻辑如下:
typescript
async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
let requestId = options?.relatedRequestId;
if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
requestId = message.id;
}
// 没有关联的请求 ID——发送到 GET 独立流
if (requestId === undefined) {
const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId);
if (standaloneSse?.controller && standaloneSse?.encoder) {
this.writeSSEEvent(standaloneSse.controller, standaloneSse.encoder, message, eventId);
}
return;
}
// 有关联的请求 ID——发送到对应的 POST 响应流
const streamId = this._requestToStreamMapping.get(requestId);
const stream = this._streamMapping.get(streamId);
if (stream?.controller && stream?.encoder) {
this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
}
// 如果是最终响应(result 或 error),检查是否所有请求都已完成
if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
this._requestResponseMap.set(requestId, message);
const relatedIds = [...this._requestToStreamMapping.entries()]
.filter(([_, sid]) => sid === streamId)
.map(([id]) => id);
const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id));
if (allResponsesReady) {
stream.cleanup(); // 关闭 SSE 流
}
}
}Python 实现采用了不同的架构——使用 anyio 的 MemoryObjectStream 作为内部消息通道,通过一个专门的 message_router 协程负责消息分发:
这里的 message_router 是一个长期运行的协程,它从 write_stream 读取上层协议产生的消息,根据消息类型和关联的请求 ID 分发到对应的请求流。这种设计将"消息从哪里来"和"消息到哪里去"彻底解耦。
13.8 协议版本协商
Streamable HTTP 引入了 mcp-protocol-version 头用于版本协商。在初始化请求中,版本信息通过 JSON-RPC 消息的 params.protocolVersion 字段传递。在后续请求中,版本通过 HTTP 头传递。
服务端的版本验证逻辑如下:
typescript
private validateProtocolVersion(req: Request): Response | undefined {
const protocolVersion = req.headers.get('mcp-protocol-version');
if (protocolVersion !== null
&& !this._supportedProtocolVersions.includes(protocolVersion)) {
return this.createJsonErrorResponse(
400, -32_000,
`Bad Request: Unsupported protocol version: ${protocolVersion}`
);
}
return undefined;
}注意这里的逻辑:如果没有提供版本头,默认接受;只有提供了但不在支持列表中时才拒绝。这是一种渐进增强的设计——老客户端不发送版本头也能工作。
13.9 与其他传输方式的对比
理解 Streamable HTTP 的定位,需要将它与其他传输方式进行对比。
| 特性 | stdio | 旧版 SSE+POST | Streamable HTTP | WebSocket |
|---|---|---|---|---|
| 通信方向 | 双向 | 双向(两个连接) | 双向(POST+GET) | 全双工 |
| 连接数 | 进程管道 | 2 个 HTTP 连接 | 1-2 个 HTTP 连接 | 1 个连接 |
| 流式传输 | 天然支持 | 支持 | 支持 | 支持 |
| 断线恢复 | 不支持 | 不支持 | 支持(EventStore) | 需自行实现 |
| 基础设施兼容 | 不适用 | 良好 | 优秀 | 中等 |
| 无状态部署 | 不适用 | 困难 | 支持 | 困难 |
| 实现复杂度 | 低 | 中 | 高 | 中 |
Streamable HTTP vs 旧版 SSE+POST: 旧方案需要客户端先通过 GET 建立 SSE 连接,服务端返回一个 endpoint URL,客户端再用这个 URL 发送 POST 请求。两个连接通过 URL 关联。Streamable HTTP 将 SSE 嵌入 POST 响应,消除了这种耦合。更重要的是,Streamable HTTP 支持服务端选择 JSON 或 SSE 响应,使得简单场景无需 SSE 即可工作。
Streamable HTTP vs WebSocket: WebSocket 的优势在于全双工——任何一方可以随时发送消息。但 WebSocket 需要专门的基础设施支持,且不能利用 HTTP 的缓存、认证、负载均衡等成熟机制。Streamable HTTP 虽然不是真正的全双工,但通过 GET 流和 POST 响应流的组合,在 HTTP 语义之上实现了等效的双向通信能力。
何时选择 Streamable HTTP: 当 MCP 服务需要跨网络访问时,Streamable HTTP 是首选。它兼容几乎所有的 HTTP 基础设施,支持无状态部署(适合 serverless),并且通过 EventStore 提供了可选的断线恢复能力。只有在局域网内对延迟极其敏感、且基础设施完全可控的场景下,WebSocket 才值得考虑。
13.10 本章小结
Streamable HTTP 是 MCP 协议中最复杂的传输层实现,也是远程部署场景的核心基础设施。它的设计体现了几个重要的工程哲学:
在约束中寻找最优解。 HTTP 的半双工特性是硬约束,但通过将 SSE 嵌入响应体、通过独立的 GET 流接收服务端推送,Streamable HTTP 在不破坏 HTTP 语义的前提下实现了等效的双向通信。
渐进增强而非全有全无。 EventStore 是可选的,GET 流是可选的,会话管理是可选的,JSON 与 SSE 响应可以共存。一个最简单的 Streamable HTTP 服务端只需要处理 POST 请求并返回 JSON 响应,就能通过协议兼容性测试。
面向故障设计。 Priming event、resumption token、指数退避重连、事件持久化与重放——这些机制共同构成了一套完整的容错体系。在不可靠的网络环境中,它们确保了长时间运行的操作不会因为瞬时断线而丢失进度。
传输层的认证透明性。 401 自动重试机制使得上层协议完全不需要关心认证状态的变化。这种关注点分离让协议层可以专注于业务语义,而传输层负责处理所有的网络和认证复杂性。
在下一章中,我们将分析 SSE 和 WebSocket 这两种传输方式——它们在某些场景下仍然有独特的价值。