Skip to content

第12章 Send 与动态并行

12.1 引言

在前面的章节中,我们已经了解了 LangGraph 的静态图编译流程:节点通过边连接、Channel 传递状态、Pregel 按超步调度。然而,现实世界中的许多工作流并不是在编译时就能确定所有执行路径的。考虑这样一个场景——用户输入一篇文章,我们需要对文章中的每个段落分别进行翻译、摘要或情感分析,然后把所有结果汇总。段落的数量在编译时未知,只有在运行时读到输入后才能确定。

这就是 动态并行 的核心需求:在运行时根据数据决定要派生多少个并行任务,每个任务可以携带不同的输入,最终所有任务的输出通过 reducer 汇聚回主图状态。LangGraph 通过 Send 对象和 Topic Channel 精巧地解决了这个问题,实现了经典的 map-reduce 模式。

本章将从 Send 类的源码出发,追踪它在条件边中的返回、写入 TASKS Channel、被 prepare_next_tasks 消费、最终派生为并行 PregelExecutableTask 的完整链路。我们会深入 Topic Channel 的 pub/sub 语义,理解它如何支撑动态 fanout;我们还会分析条件边如何返回多个 Send 对象,以及 Command.goto 中嵌入 Send 的高级用法。

本章要点

  1. Send 对象的数据结构与语义——向指定节点发送自定义输入
  2. Topic Channel 的 pub/sub 机制——支持多值累积与逐步消费
  3. Map-Reduce 模式的完整实现——从 fanout 到 aggregation
  4. 动态 fanout 的调度细节——prepare_push_task_send 的执行链路
  5. 条件边返回多个 Send 的工程实践与限制

12.2 Send 对象的设计

12.2.1 数据结构

Send 是 LangGraph 中最简洁的数据结构之一,定义在 langgraph/types.py 中:

python
class Send:
    """A message or packet to send to a specific node in the graph."""

    __slots__ = ("node", "arg")

    node: str
    arg: Any

    def __init__(self, /, node: str, arg: Any) -> None:
        self.node = node
        self.arg = arg

    def __hash__(self) -> int:
        return hash((self.node, self.arg))

    def __repr__(self) -> str:
        return f"Send(node={self.node!r}, arg={self.arg!r})"

    def __eq__(self, value: object) -> bool:
        return (
            isinstance(value, Send)
            and self.node == value.node
            and self.arg == value.arg
        )

Send 只有两个字段:node 指定目标节点名,arg 是传递给该节点的自定义输入。使用 __slots__ 优化内存占用,实现了 __hash____eq__ 以支持去重和集合操作。

关键设计决策在于 arg 的类型是 Any——这意味着 Send 携带的输入可以与图的主状态 schema 完全不同。当一个节点通过 Send 被调用时,它接收的不是完整的图状态,而是 Send 中指定的 arg。这是实现动态并行的核心:每个并行任务可以有自己独立的输入。

12.2.2 Send 与普通边的本质区别

在静态图中,边定义了数据流的拓扑关系,所有节点共享同一份状态。而 Send 改变了这个规则:

静态边下,NodeB 的每次执行都读取完整的图状态。而在动态 Send 模式下,同一个节点可以被实例化多次,每个实例接收不同的输入。这种区别在源码中体现为两种不同的任务类型——PULL 任务(由 Channel 更新触发)和 PUSH 任务(由 Send 对象创建)。

12.2.3 Send 的哈希与去重

Send 实现了 __hash____eq__,这使得它可以被放入集合或用作字典键。哈希基于 (node, arg) 的元组,这意味着:

  • 两个 Send("node_a", {"x": 1}) 是相等的
  • Send("node_a", {"x": 1})Send("node_a", {"x": 2}) 是不同的
  • 这在 checkpoint 恢复时用于比对已执行的任务

需要注意的是,如果 arg 包含不可哈希的对象(如嵌套的列表),__hash__ 会抛出 TypeError。这是 Python 标准行为的自然延伸。

12.3 Topic Channel:动态并行的基础设施

12.3.1 Topic 的 Pub/Sub 语义

Send 对象最终被写入一个名为 __pregel_tasks(即常量 TASKS)的特殊 Channel,这个 Channel 的类型就是 Topic[Send]Topic Channel 定义在 langgraph/channels/topic.py 中,它实现了经典的发布/订阅模式:

python
class Topic(
    Generic[Value],
    BaseChannel[Sequence[Value], Value | list[Value], list[Value]],
):
    """A configurable PubSub Topic."""

    __slots__ = ("values", "accumulate")

    def __init__(self, typ: type[Value], accumulate: bool = False) -> None:
        super().__init__(typ)
        self.accumulate = accumulate
        self.values = list[Value]()

    def update(self, values: Sequence[Value | list[Value]]) -> bool:
        updated = False
        if not self.accumulate:
            updated = bool(self.values)
            self.values = list[Value]()
        if flat_values := tuple(_flatten(values)):
            updated = True
            self.values.extend(flat_values)
        return updated

    def get(self) -> Sequence[Value]:
        if self.values:
            return list(self.values)
        else:
            raise EmptyChannelError

LastValue Channel(只保留最新值)不同,Topic 可以在一个超步内接收多个值。_flatten 辅助函数将嵌套列表展平,使得无论是单个 Send 还是 Send 列表都能正确处理。

12.3.2 _flatten 的展平逻辑

python
def _flatten(values: Sequence[Value | list[Value]]) -> Iterator[Value]:
    for value in values:
        if isinstance(value, list):
            yield from value
        else:
            yield value

这个简洁的生成器函数实现了一层展平。它的设计意图是:Channel 的 update 方法接收的是一个"来自各任务的写入值列表"。每个写入值本身可以是单个 Send,也可以是一个 Send 列表。_flatten 把这两层统一为一个平坦的 Send 序列。

12.3.3 accumulate 模式

accumulate 参数控制 Topic 的跨超步行为:

  • accumulate=False(默认):每个超步开始时清空旧值,只保留本步写入的新值。这是 TASKS Channel 使用的模式——每一轮只处理当前步产生的 Send。
  • accumulate=True:值跨超步累积,适用于需要收集所有历史消息的场景。

12.3.4 Topic 与其他 Channel 的对比

特性LastValueBinaryOperatorAggregateTopic
单步值数量11(经 reducer 合并)N
更新语义覆盖聚合追加
跨步保留可选(accumulate)
典型用途普通状态字段Annotated[list, operator.add]TASKS 分发

12.4 Map-Reduce 模式的完整实现

12.4.1 经典用法

让我们通过一个完整的 map-reduce 示例来理解 Send 的工作流程:

python
from typing import Annotated, TypedDict
from langgraph.types import Send
from langgraph.graph import END, START, StateGraph
import operator

class OverallState(TypedDict):
    subjects: list[str]
    jokes: Annotated[list[str], operator.add]

def continue_to_jokes(state: OverallState):
    """条件边函数:为每个 subject 生成一个 Send"""
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

def generate_joke(state: dict) -> dict:
    """工作节点:注意它接收的不是 OverallState,而是 Send 的 arg"""
    return {"jokes": [f"Joke about {state['subject']}"]}

builder = StateGraph(OverallState)
builder.add_node("generate_joke", generate_joke)
builder.add_conditional_edges(START, continue_to_jokes)
builder.add_edge("generate_joke", END)
graph = builder.compile()

result = graph.invoke({"subjects": ["cats", "dogs", "robots"]})
# {'subjects': ['cats', 'dogs', 'robots'],
#  'jokes': ['Joke about cats', 'Joke about dogs', 'Joke about robots']}

基于 VitePress 构建