Skip to content

第13章 流式输出与调试

13.1 引言

在构建 LLM 应用时,流式输出(streaming)不仅是用户体验的刚需,更是生产系统的可观测性基石。用户期望看到 AI 回复的"打字机效果",而开发者需要实时监控图执行的每一步——哪个节点正在运行、状态如何变化、中间结果是什么。LangGraph 通过七种 StreamMode 和精心设计的 StreamPart 类型体系,提供了从粗粒度到细粒度的完整流式输出方案。

LangGraph 1.1.6 引入了 v2 Stream API,它将所有流式数据统一为带类型标签的 StreamPart 字典,���底解决了 v1 中多种 stream_mode 混合使用时类型模糊的问题。v2 API 使得每个流式事件都是自描述的:你可以通过 part["type"] 直接判断事件类型,无需依赖上下文推断。

本章将从 StreamMode 的枚举定义出发,逐一分析七种模式的语义和实现,深入 StreamProtocolStreamMessagesHandler 的源码,揭示流式输出在 Pregel 循环中的注入点和数据流转路径。

本章要点

  1. 七种 StreamMode 的语义区别——values/updates/custom/messages/checkpoints/tasks/debug
  2. v2 Stream API 的 StreamPart 类型体系——带类型标签的联合类型
  3. StreamWriter 的注入机制——如何在节点中发射自定义流式数据
  4. StreamMessagesHandler 的回调实现——LLM token 级别的流式捕获
  5. StreamProtocol 在 Pregel 循环中的集成方式

13.2 StreamMode 枚举

13.2.1 七种模式定义

StreamMode 定义在 langgraph/types.py 中,是一个字面量联合类型:

python
StreamMode = Literal[
    "values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"
]

每种模式对应一种输出粒度和视角:

StreamMode输出内容触发时机典型消费者
values完整状态快照每个超步结束后前端展示完整状态
updates节点名 + 该节点的输出每个节点执行后调试、日志
custom用户自定义数据节点内调用 StreamWriter进度条、中间结果
messagesLLM 消息 tokenLLM 流式生成时打字机效果
checkpoints检查点快照检查点创建时持久化监控
tasks任务开始/结束事件任务生命周期变化执行监控
debugcheckpoints + tasks同上两者开发调试

13.2.2 模式组合

stream 方法支持同时订阅多种模式:

python
# v1 API:多模式返回元组
for mode, data in graph.stream(input, stream_mode=["values", "messages"]):
    if mode == "values":
        print(f"State: {data}")
    elif mode == "messages":
        print(f"Token: {data}")

# v2 API:统一的 StreamPart
for part in graph.stream(input, version="v2"):
    if part["type"] == "values":
        print(f"State: {part['data']}")
    elif part["type"] == "messages":
        msg, meta = part["data"]
        print(f"Token: {msg.content}")

13.3 StreamPart 类型体系

13.3.1 v2 API 的核心创新

v2 Stream API 将每个流式事件包装为一个带 type 字段的 TypedDict,使得类型判别成为可能:

python
StreamPart = TypeAliasType(
    "StreamPart",
    ValuesStreamPart[OutputT]
    | UpdatesStreamPart
    | MessagesStreamPart
    | CustomStreamPart
    | CheckpointStreamPart[StateT]
    | TasksStreamPart
    | DebugStreamPart[StateT],
    type_params=(StateT, OutputT),
)

每个具体的 StreamPart 类型都包含三个公共字段:

13.3.2 ns 字段:命名空间追踪

每个 StreamPart 都包含 ns(namespace)字段,它是一个字符串元组,标识事件来自图的哪个层级:

  • ():来自顶层图
  • ("subgraph_name",):来自名为 subgraph_name 的子图
  • ("outer", "inner"):来自嵌套两层的子图

这使得在启用 subgraphs=True 时,消费者可以精确识别每个事件的来源。

13.3.3 ValuesStreamPart

python
class ValuesStreamPart(TypedDict, Generic[OutputT]):
    type: Literal["values"]
    ns: tuple[str, ...]
    data: OutputT
    interrupts: tuple[Interrupt, ...]

values 模式在每个超步结束后发射完整的状态快照。data 的类型与图的输出类型一致。interrupts 字段记录了该步中发生的中断——这是 v2 API 特有的增强,v1 中中断信息需要通过 __interrupt__ 键间接获取。

13.3.4 UpdatesStreamPart

python
class UpdatesStreamPart(TypedDict):
    type: Literal["updates"]
    ns: tuple[str, ...]
    data: dict[str, Any]

updates 模式在每个节点执行后发射该节点的输出。data 是一个字典,键是节点名,值是节点返回的更新。如果多个节点在同一超步并行执行,每个节点的更新会单独发射。

13.3.5 MessagesStreamPart

python
class MessagesStreamPart(TypedDict):
    type: Literal["messages"]
    ns: tuple[str, ...]
    data: tuple[AnyMessage, dict[str, Any]]

messages 模式捕获 LLM 的 token 级流式输出。data 是一个二元组:消息对象(通常是 AIMessageChunk)和元数据字典。元数据包含 langgraph_steplanggraph_nodelanggraph_triggers 等上下文信息。

13.3.6 CustomStreamPart

python
class CustomStreamPart(TypedDict):
    type: Literal["custom"]
    ns: tuple[str, ...]
    data: Any

custom 模式承载用户通过 StreamWriter 发射的任意数据。这是最灵活的模式——你可以用它发送进度百分比、中间计算结果、或任何自定义结构。

13.4 StreamWriter 注入机制

13.4.1 定义与签名

python
StreamWriter = Callable[[Any], None]

StreamWriter 的类型定义极为简洁——它就是一个接收任意参数、无返回值的可调用对象。它总是作为关键字参数注入到节点函数中:

基于 VitePress 构建