Skip to content

第5章 流式消息与状态机

"In a stream, every drop of water knows the way." -- Lao Tzu

本章要点

  • Claude Code 消息类型体系的完整层级:从核心四元组到上下文管理消息,再到流事件
  • queryModelWithStreaming 中 API SSE 事件的逐块累积与 AssistantMessage 的构造过程
  • handleMessageFromStream 如何将异构消息流映射为 React/Ink 组件可消费的状态更新
  • CompactBoundaryMessageTombstoneMessageToolUseSummaryMessage 三种特殊消息的设计意图
  • 用户取消 (AbortController)、流式超时 (Idle Watchdog)、模型降级 (Fallback) 三条错误恢复路径
  • 流式工具执行器 StreamingToolExecutor 如何实现工具与模型响应的并行流水线

上一章我们深入剖析了 Query 引擎的循环架构,理解了 query.ts 如何以 while(true) 驱动整个 Agent 回合。但在那条宏观流水线上,有一个关键环节我们尚未展开:从 API 返回的原始 SSE (Server-Sent Events) 字节流,如何被解析为结构化的消息对象,再经过一系列变换最终呈现在用户面前?

这正是本章要回答的核心问题。流式消息系统是 Claude Code 的神经网络——它不只是简单地搬运数据,而是在搬运过程中完成类型判定、状态追踪、UI 驱动、错误拦截等多重职责。理解这套系统,是掌握 Claude Code 从"能用"到"好用"之间那道鸿沟的关键。

在传统的请求-响应模式中,客户端发送请求,等待服务器返回完整响应,然后一次性处理。这种模式对于短文本生成尚可接受,但当模型需要输出数千 token 的代码、执行多轮工具调用时,用户将面临漫长的白屏等待。流式处理彻底改变了这个范式:服务器在生成每个 token 时就立即推送给客户端,客户端在接收的同时就开始渲染。这不仅仅是体验优化——它从根本上改变了系统的架构约束,要求每个环节都能处理"不完整"的数据,并在数据逐步完善的过程中维护一致的状态。

5.1 消息类型体系

以下类图展示了 Claude Code 消息类型体系的完整层级关系:

5.1.1 设计哲学:联合类型的分发艺术

Claude Code 的消息体系建立在 TypeScript 的判别联合类型 (Discriminated Union) 之上。所有消息通过 type 字段区分,每种类型携带与其职责严格匹配的字段集合。这种设计使得编译器能够在每个 switch 分支中自动收窄类型,消除运行时的类型断言需求。

src/utils/messages.ts 的导入声明中,我们可以看到完整的消息类型版图:

typescript
// 文件:src/utils/messages.ts(节选导入声明)
import type {
  AssistantMessage,
  AttachmentMessage,
  Message,
  NormalizedAssistantMessage,
  NormalizedMessage,
  NormalizedUserMessage,
  ProgressMessage,
  RequestStartEvent,
  StreamEvent,
  SystemAgentsKilledMessage,
  SystemAPIErrorMessage,
  SystemApiMetricsMessage,
  SystemAwaySummaryMessage,
  SystemBridgeStatusMessage,
  SystemCompactBoundaryMessage,
  SystemInformationalMessage,
  SystemLocalCommandMessage,
  SystemMemorySavedMessage,
  SystemMessage,
  SystemMicrocompactBoundaryMessage,
  SystemPermissionRetryMessage,
  SystemScheduledTaskFireMessage,
  SystemStopHookSummaryMessage,
  SystemTurnDurationMessage,
  TombstoneMessage,
  ToolUseSummaryMessage,
  UserMessage,
} from '../types/message.js'

这个导入列表揭示了一个关键的架构决策:消息类型并非简单的四元组 (System / User / Assistant / Tool Result),而是一个经过精心分层的类型层级,包含核心消息类型系统消息子类型流事件类型上下文管理类型四个维度。

值得注意的是,这些类型定义在独立的 types/message.ts 文件中(编译后通过 .js 引用),然后被 utils/messages.ts 这个超过五千行的工具模块所消费。类型定义与工具函数的分离确保了类型可以被跨模块引用而不产生循环依赖——这在 Claude Code 这样的大型项目中是至关重要的架构纪律。整个消息系统遵循"类型在上、工具在中、组件在下"的三层结构:类型层定义数据的形状,工具层提供创建和变换消息的纯函数,组件层负责渲染和交互。

5.1.2 核心消息四元组

Claude Code 的核心消息类型与 Claude API 的消息角色模型一一对应,但在此基础上添加了大量元数据:

AssistantMessage 是模型响应的载体。每个 AssistantMessage 都包裹着一个完整的 API BetaMessage 对象,同时附加了 Claude Code 特有的状态字段:

typescript
// 文件:src/utils/messages.ts(baseCreateAssistantMessage 函数,展示字段结构)
function baseCreateAssistantMessage({
  content,
  isApiErrorMessage = false,
  apiError,
  error,
  errorDetails,
  isVirtual,
  usage = { /* 默认零值 */ },
}: { ... }): AssistantMessage {
  return {
    type: 'assistant',
    uuid: randomUUID(),
    timestamp: new Date().toISOString(),
    message: {
      id: randomUUID(),
      container: null,
      model: SYNTHETIC_MODEL,
      role: 'assistant',
      stop_reason: 'stop_sequence',
      stop_sequence: '',
      type: 'message',
      usage,
      content,
      context_management: null,
    },
    requestId: undefined,
    apiError,       // 'max_output_tokens' | 'prompt_too_long' 等
    error,          // SDK 层面的错误分类
    errorDetails,   // 人类可读的错误描述
    isApiErrorMessage,  // 标记此消息是否为合成错误消息
    isVirtual,      // 标记是否为非 API 产生的虚拟消息
  }
}

这里有一个精妙的设计:apiError 字段的存在意味着 AssistantMessage 不仅承载模型的正常响应,还承载 API 层面的错误信息。当模型输出超过 max_output_tokens 限制、或请求超过上下文窗口时,系统不会抛出异常,而是生成一个带有 apiError 标记的 AssistantMessage。这让上层的恢复逻辑可以用统一的消息处理管道来处理正常响应和错误——第4章中分析的 "扣留-恢复" 机制正是建立在这个设计之上。

UserMessage 是用户输入和工具结果的统一载体:

typescript
// 文件:src/utils/messages.ts
export function createUserMessage({
  content,
  isMeta,
  isVisibleInTranscriptOnly,
  isVirtual,
  isCompactSummary,
  toolUseResult,
  mcpMeta,
  uuid,
  timestamp,
  imagePasteIds,
  sourceToolAssistantUUID,
  permissionMode,
  origin,
  ...
}: { ... }): UserMessage {
  const m: UserMessage = {
    type: 'user',
    message: {
      role: 'user',
      content: content || NO_CONTENT_MESSAGE,
    },
    isMeta,                   // 元消息,不显示给用户
    isVisibleInTranscriptOnly, // 仅在转录中可见
    isVirtual,                // 非真实用户输入
    isCompactSummary,         // 压缩摘要标记
    toolUseResult,            // 工具执行的结构化输出
    sourceToolAssistantUUID,  // 对应的 tool_use 所在 assistant 消息
    permissionMode,           // 发送时的权限模式快照
    origin,                   // 消息来源:human / hook / slash_command
    uuid: (uuid as UUID) || randomUUID(),
    timestamp: timestamp ?? new Date().toISOString(),
    ...
  }
  return m
}

UserMessagecontent 字段可以是纯文本字符串,也可以是 ContentBlockParam[] 数组(包含 tool_resultimagetext 等块类型)。当作为工具结果使用时,content 数组中会包含 tool_result 类型的块,并通过 sourceToolAssistantUUID 字段追溯到发起工具调用的那条 AssistantMessage

SystemMessage 是系统内部信息的载体。与前两种消息不同,SystemMessage 通过 subtype 字段进一步细分为十余种子类型,每种子类型携带不同的附加字段。这种设计避免了创建过多顶层类型带来的 switch 分支爆炸问题。

5.1.3 系统消息的子类型谱系

系统消息的 subtype 字段构成了一个完整的运行时事件谱系:

子类型工厂函数用途
informationalcreateSystemMessage()通用提示,如模型切换通知
api_errorcreateSystemAPIErrorMessage()API 重试等待提示
compact_boundarycreateCompactBoundaryMessage()上下文压缩分界标记
microcompact_boundarycreateMicrocompactBoundaryMessage()微压缩分界标记
local_commandcreateCommandInputMessage()本地斜杠命令的输入记录
permission_retrycreatePermissionRetryMessage()权限授予后的重试通知
bridge_statuscreateBridgeStatusMessage()远程控制桥接状态
stop_hook_summarycreateStopHookSummaryMessage()停止钩子执行摘要
scheduled_task_firecreateScheduledTaskFireMessage()定时任务触发通知
turn_durationcreateTurnDurationMessage()回合耗时统计(内部使用)
agents_killedcreateAgentsKilledMessage()子代理终止通知

每种子类型的工厂函数都确保了字段完整性——调用者无需手动组装 timestampuuid 等公共字段。例如 createCompactBoundaryMessage 函数:

typescript
// 文件:src/utils/messages.ts
export function createCompactBoundaryMessage(
  trigger: 'manual' | 'auto',
  preTokens: number,
  lastPreCompactMessageUuid?: UUID,
  userContext?: string,
  messagesSummarized?: number,
): SystemCompactBoundaryMessage {
  return {
    type: 'system',
    subtype: 'compact_boundary',
    content: 'Conversation compacted',
    isMeta: false,
    timestamp: new Date().toISOString(),
    uuid: randomUUID(),
    level: 'info',
    compactMetadata: {
      trigger,            // 'manual' 或 'auto'
      preTokens,          // 压缩前的 token 数
      userContext,         // 用户上下文快照
      messagesSummarized, // 被摘要的消息数量
    },
    ...(lastPreCompactMessageUuid && {
      logicalParentUuid: lastPreCompactMessageUuid,
    }),
  }
}

5.1.4 流事件与元消息

除了持久化的消息类型外,还有三种"短生命周期"的事件类型,它们在流式管道中流转,但不会被存入消息历史:

StreamEvent 是 API SSE 事件的薄包装。它携带原始的 BetaRawMessageStreamEvent(如 message_startcontent_block_startcontent_block_deltacontent_block_stopmessage_deltamessage_stop),以及可选的 ttftMs(首 token 响应时间)字段。StreamEvent 的职责是驱动 UI 的实时更新——spinner 模式切换、流式文本显示、工具输入预览等。

RequestStartEvent 是查询循环每次迭代开始时发出的信号,类型为 { type: 'stream_request_start' }。它告知 UI 层新的 API 请求即将发起,触发 spinner 进入 "requesting" 状态。

ProgressMessage 是工具执行过程中的进度报告。每个 ProgressMessage 都绑定到一个特定的 toolUseIDparentToolUseID,UI 层据此将进度信息关联到对应的工具调用 UI 组件中:

typescript
// 文件:src/utils/messages.ts
export function createProgressMessage<P extends Progress>({
  toolUseID,
  parentToolUseID,
  data,
}: {
  toolUseID: string
  parentToolUseID: string
  data: P
}): ProgressMessage<P> {
  return {
    type: 'progress',
    data,             // 工具特定的进度数据
    toolUseID,        // 当前工具调用的 ID
    parentToolUseID,  // 父级工具调用 ID(用于子代理场景)
    uuid: randomUUID(),
    timestamp: new Date().toISOString(),
  }
}

AttachmentMessage 是附件信息的载体,用于在消息流中注入上下文信息——如记忆内容、技能发现结果、钩子执行结果、排队命令等。它通过 attachment.type 字段区分不同种类的附件。AttachmentMessage 在消息流中扮演着"侧信道"的角色:它不是对话的直接内容,而是为模型提供额外的决策依据。例如,当自动记忆系统检测到与当前对话相关的历史偏好时,会通过 AttachmentMessage 注入上下文;当钩子系统在工具执行前后产生输出时,同样通过 AttachmentMessage 传递给模型。这种设计将"核心对话"和"辅助上下文"清晰分离,使得压缩系统在需要精简上下文时,可以优先剔除附件而不影响对话的连贯性。

5.1.5 消息归一化:从多内容块到单内容块

一个 API 响应可能包含多个内容块(thinking + text + tool_use),但 UI 渲染需要每条消息对应一个内容块。normalizeMessages 函数承担了这个拆分职责:

typescript
// 文件:src/utils/messages.ts
export function normalizeMessages(messages: Message[]): NormalizedMessage[] {
  let isNewChain = false
  return messages.flatMap(message => {
    switch (message.type) {
      case 'assistant': {
        isNewChain = isNewChain || message.message.content.length > 1
        return message.message.content.map((_, index) => {
          const uuid = isNewChain
            ? deriveUUID(message.uuid, index)
            : message.uuid
          return {
            type: 'assistant' as const,
            message: { ...message.message, content: [_] },
            uuid,
            // ...其他字段
          } as NormalizedAssistantMessage
        })
      }
      // ...user 消息类似处理
    }
  })
}

deriveUUID 函数通过将索引编码到 UUID 的末12位来生成确定性的派生 UUID,确保同一条消息在不同渲染周期中始终获得相同的 key——这对 React 的 reconciliation 至关重要。

5.2 流式架构

以下时序图展示了从 API SSE 事件到 UI 渲染的完整流式数据通路:

5.2.1 AsyncGenerator:数据流的脊柱

Claude Code 的流式架构建立在 JavaScript 的 AsyncGenerator 协议之上。这不是偶然的技术选择,而是经过深思熟虑的架构决策。

从最底层的 API 调用到最上层的 UI 消费,整条数据管道由三层嵌套的异步生成器组成:

queryModelWithStreaming()    底层:API SSE -> StreamEvent | AssistantMessage
       |
       v
queryLoop()                  中层:编排工具执行、错误恢复、续行判断
       |
       v
REPL.tsx onQueryEvent()      上层:React 状态更新

基于 VitePress 构建