Appearance
第13章 流式输出与调试
13.1 引言
在构建 LLM 应用时,流式输出(streaming)不仅是用户体验的刚需,更是生产系统的可观测性基石。用户期望看到 AI 回复的"打字机效果",而开发者需要实时监控图执行的每一步——哪个节点正在运行、状态如何变化、中间结果是什么。LangGraph 通过七种 StreamMode 和精心设计的 StreamPart 类型体系,提供了从粗粒度到细粒度的完整流式输出方案。
LangGraph 1.1.6 引入了 v2 Stream API,它将所有流式数据统一为带类型标签的 StreamPart 字典,���底解决了 v1 中多种 stream_mode 混合使用时类型模糊的问题。v2 API 使得每个流式事件都是自描述的:你可以通过 part["type"] 直接判断事件类型,无需依赖上下文推断。
本章将从 StreamMode 的枚举定义出发,逐一分析七种模式的语义和实现,深入 StreamProtocol 和 StreamMessagesHandler 的源码,揭示流式输出在 Pregel 循环中的注入点和数据流转路径。
本章要点
- 七种 StreamMode 的语义区别——values/updates/custom/messages/checkpoints/tasks/debug
- v2 Stream API 的 StreamPart 类型体系——带类型标签的联合类型
- StreamWriter 的注入机制——如何在节点中发射自定义流式数据
- StreamMessagesHandler 的回调实现——LLM token 级别的流式捕获
- StreamProtocol 在 Pregel 循环中的集成方式
13.2 StreamMode 枚举
13.2.1 七种模式定义
StreamMode 定义在 langgraph/types.py 中,是一个字面量联合类型:
python
StreamMode = Literal[
"values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"
]每种模式对应一种输出粒度和视角:
| StreamMode | 输出内容 | 触发时机 | 典型消费者 |
|---|---|---|---|
values | 完整状态快照 | 每个超步结束后 | 前端展示完整状态 |
updates | 节点名 + 该节点的输出 | 每个节点执行后 | 调试、日志 |
custom | 用户自定义数据 | 节点内调用 StreamWriter | 进度条、中间结果 |
messages | LLM 消息 token | LLM 流式生成时 | 打字机效果 |
checkpoints | 检查点快照 | 检查点创建时 | 持久化监控 |
tasks | 任务开始/结束事件 | 任务生命周期变化 | 执行监控 |
debug | checkpoints + tasks | 同上两者 | 开发调试 |
13.2.2 模式组合
stream 方法支持同时订阅多种模式:
python
# v1 API:多模式返回元组
for mode, data in graph.stream(input, stream_mode=["values", "messages"]):
if mode == "values":
print(f"State: {data}")
elif mode == "messages":
print(f"Token: {data}")
# v2 API:统一的 StreamPart
for part in graph.stream(input, version="v2"):
if part["type"] == "values":
print(f"State: {part['data']}")
elif part["type"] == "messages":
msg, meta = part["data"]
print(f"Token: {msg.content}")13.3 StreamPart 类型体系
13.3.1 v2 API 的核心创新
v2 Stream API 将每个流式事件包装为一个带 type 字段的 TypedDict,使得类型判别成为可能:
python
StreamPart = TypeAliasType(
"StreamPart",
ValuesStreamPart[OutputT]
| UpdatesStreamPart
| MessagesStreamPart
| CustomStreamPart
| CheckpointStreamPart[StateT]
| TasksStreamPart
| DebugStreamPart[StateT],
type_params=(StateT, OutputT),
)每个具体的 StreamPart 类型都包含三个公共字段:
13.3.2 ns 字段:命名空间追踪
每个 StreamPart 都包含 ns(namespace)字段,它是一个字符串元组,标识事件来自图的哪个层级:
():来自顶层图("subgraph_name",):来自名为 subgraph_name 的子图("outer", "inner"):来自嵌套两层的子图
这使得在启用 subgraphs=True 时,消费者可以精确识别每个事件的来源。
13.3.3 ValuesStreamPart
python
class ValuesStreamPart(TypedDict, Generic[OutputT]):
type: Literal["values"]
ns: tuple[str, ...]
data: OutputT
interrupts: tuple[Interrupt, ...]values 模式在每个超步结束后发射完整的状态快照。data 的类型与图的输出类型一致。interrupts 字段记录了该步中发生的中断——这是 v2 API 特有的增强,v1 中中断信息需要通过 __interrupt__ 键间接获取。
13.3.4 UpdatesStreamPart
python
class UpdatesStreamPart(TypedDict):
type: Literal["updates"]
ns: tuple[str, ...]
data: dict[str, Any]updates 模式在每个节点执行后发射该节点的输出。data 是一个字典,键是节点名,值是节点返回的更新。如果多个节点在同一超步并行执行,每个节点的更新会单独发射。
13.3.5 MessagesStreamPart
python
class MessagesStreamPart(TypedDict):
type: Literal["messages"]
ns: tuple[str, ...]
data: tuple[AnyMessage, dict[str, Any]]messages 模式捕获 LLM 的 token 级流式输出。data 是一个二元组:消息对象(通常是 AIMessageChunk)和元数据字典。元数据包含 langgraph_step、langgraph_node、langgraph_triggers 等上下文信息。
13.3.6 CustomStreamPart
python
class CustomStreamPart(TypedDict):
type: Literal["custom"]
ns: tuple[str, ...]
data: Anycustom 模式承载用户通过 StreamWriter 发射的任意数据。这是最灵活的模式——你可以用它发送进度百分比、中间计算结果、或任何自定义结构。
13.4 StreamWriter 注入机制
13.4.1 定义与签名
python
StreamWriter = Callable[[Any], None]StreamWriter 的类型定义极为简洁——它就是一个接收任意参数、无返回值的可调用对象。它总是作为关键字参数注入到节点函数中: