Appearance
第12章 Send 与动态并行
12.1 引言
在前面的章节中,我们已经了解了 LangGraph 的静态图编译流程:节点通过边连接、Channel 传递状态、Pregel 按超步调度。然而,现实世界中的许多工作流并不是在编译时就能确定所有执行路径的。考虑这样一个场景——用户输入一篇文章,我们需要对文章中的每个段落分别进行翻译、摘要或情感分析,然后把所有结果汇总。段落的数量在编译时未知,只有在运行时读到输入后才能确定。
这就是 动态并行 的核心需求:在运行时根据数据决定要派生多少个并行任务,每个任务可以携带不同的输入,最终所有任务的输出通过 reducer 汇聚回主图状态。LangGraph 通过 Send 对象和 Topic Channel 精巧地解决了这个问题,实现了经典的 map-reduce 模式。
本章将从 Send 类的源码出发,追踪它在条件边中的返回、写入 TASKS Channel、被 prepare_next_tasks 消费、最终派生为并行 PregelExecutableTask 的完整链路。我们会深入 Topic Channel 的 pub/sub 语义,理解它如何支撑动态 fanout;我们还会分析条件边如何返回多个 Send 对象,以及 Command.goto 中嵌入 Send 的高级用法。
本章要点
Send对象的数据结构与语义——向指定节点发送自定义输入TopicChannel 的 pub/sub 机制——支持多值累积与逐步消费- Map-Reduce 模式的完整实现——从 fanout 到 aggregation
- 动态 fanout 的调度细节——
prepare_push_task_send的执行链路 - 条件边返回多个 Send 的工程实践与限制
12.2 Send 对象的设计
12.2.1 数据结构
Send 是 LangGraph 中最简洁的数据结构之一,定义在 langgraph/types.py 中:
python
class Send:
"""A message or packet to send to a specific node in the graph."""
__slots__ = ("node", "arg")
node: str
arg: Any
def __init__(self, /, node: str, arg: Any) -> None:
self.node = node
self.arg = arg
def __hash__(self) -> int:
return hash((self.node, self.arg))
def __repr__(self) -> str:
return f"Send(node={self.node!r}, arg={self.arg!r})"
def __eq__(self, value: object) -> bool:
return (
isinstance(value, Send)
and self.node == value.node
and self.arg == value.arg
)Send 只有两个字段:node 指定目标节点名,arg 是传递给该节点的自定义输入。使用 __slots__ 优化内存占用,实现了 __hash__ 和 __eq__ 以支持去重和集合操作。
关键设计决策在于 arg 的类型是 Any——这意味着 Send 携带的输入可以与图的主状态 schema 完全不同。当一个节点通过 Send 被调用时,它接收的不是完整的图状态,而是 Send 中指定的 arg。这是实现动态并行的核心:每个并行任务可以有自己独立的输入。
12.2.2 Send 与普通边的本质区别
在静态图中,边定义了数据流的拓扑关系,所有节点共享同一份状态。而 Send 改变了这个规则:
静态边下,NodeB 的每次执行都读取完整的图状态。而在动态 Send 模式下,同一个节点可以被实例化多次,每个实例接收不同的输入。这种区别在源码中体现为两种不同的任务类型——PULL 任务(由 Channel 更新触发)和 PUSH 任务(由 Send 对象创建)。
12.2.3 Send 的哈希与去重
Send 实现了 __hash__ 和 __eq__,这使得它可以被放入集合或用作字典键。哈希基于 (node, arg) 的元组,这意味着:
- 两个
Send("node_a", {"x": 1})是相等的 Send("node_a", {"x": 1})和Send("node_a", {"x": 2})是不同的- 这在 checkpoint 恢复时用于比对已执行的任务
需要注意的是,如果 arg 包含不可哈希的对象(如嵌套的列表),__hash__ 会抛出 TypeError。这是 Python 标准行为的自然延伸。
12.3 Topic Channel:动态并行的基础设施
12.3.1 Topic 的 Pub/Sub 语义
Send 对象最终被写入一个名为 __pregel_tasks(即常量 TASKS)的特殊 Channel,这个 Channel 的类型就是 Topic[Send]。Topic Channel 定义在 langgraph/channels/topic.py 中,它实现了经典的发布/订阅模式:
python
class Topic(
Generic[Value],
BaseChannel[Sequence[Value], Value | list[Value], list[Value]],
):
"""A configurable PubSub Topic."""
__slots__ = ("values", "accumulate")
def __init__(self, typ: type[Value], accumulate: bool = False) -> None:
super().__init__(typ)
self.accumulate = accumulate
self.values = list[Value]()
def update(self, values: Sequence[Value | list[Value]]) -> bool:
updated = False
if not self.accumulate:
updated = bool(self.values)
self.values = list[Value]()
if flat_values := tuple(_flatten(values)):
updated = True
self.values.extend(flat_values)
return updated
def get(self) -> Sequence[Value]:
if self.values:
return list(self.values)
else:
raise EmptyChannelError与 LastValue Channel(只保留最新值)不同,Topic 可以在一个超步内接收多个值。_flatten 辅助函数将嵌套列表展平,使得无论是单个 Send 还是 Send 列表都能正确处理。
12.3.2 _flatten 的展平逻辑
python
def _flatten(values: Sequence[Value | list[Value]]) -> Iterator[Value]:
for value in values:
if isinstance(value, list):
yield from value
else:
yield value这个简洁的生成器函数实现了一层展平。它的设计意图是:Channel 的 update 方法接收的是一个"来自各任务的写入值列表"。每个写入值本身可以是单个 Send,也可以是一个 Send 列表。_flatten 把这两层统一为一个平坦的 Send 序列。
12.3.3 accumulate 模式
accumulate 参数控制 Topic 的跨超步行为:
accumulate=False(默认):每个超步开始时清空旧值,只保留本步写入的新值。这是 TASKS Channel 使用的模式——每一轮只处理当前步产生的 Send。accumulate=True:值跨超步累积,适用于需要收集所有历史消息的场景。
12.3.4 Topic 与其他 Channel 的对比
| 特性 | LastValue | BinaryOperatorAggregate | Topic |
|---|---|---|---|
| 单步值数量 | 1 | 1(经 reducer 合并) | N |
| 更新语义 | 覆盖 | 聚合 | 追加 |
| 跨步保留 | 是 | 是 | 可选(accumulate) |
| 典型用途 | 普通状态字段 | Annotated[list, operator.add] | TASKS 分发 |
12.4 Map-Reduce 模式的完整实现
12.4.1 经典用法
让我们通过一个完整的 map-reduce 示例来理解 Send 的工作流程:
python
from typing import Annotated, TypedDict
from langgraph.types import Send
from langgraph.graph import END, START, StateGraph
import operator
class OverallState(TypedDict):
subjects: list[str]
jokes: Annotated[list[str], operator.add]
def continue_to_jokes(state: OverallState):
"""条件边函数:为每个 subject 生成一个 Send"""
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
def generate_joke(state: dict) -> dict:
"""工作节点:注意它接收的不是 OverallState,而是 Send 的 arg"""
return {"jokes": [f"Joke about {state['subject']}"]}
builder = StateGraph(OverallState)
builder.add_node("generate_joke", generate_joke)
builder.add_conditional_edges(START, continue_to_jokes)
builder.add_edge("generate_joke", END)
graph = builder.compile()
result = graph.invoke({"subjects": ["cats", "dogs", "robots"]})
# {'subjects': ['cats', 'dogs', 'robots'],
# 'jokes': ['Joke about cats', 'Joke about dogs', 'Joke about robots']}