Skip to content

第6章 Pregel 执行引擎

6.1 引言

Pregel 是 LangGraph 的心脏。当你调用 compiled_graph.invoke()compiled_graph.stream() 时,真正驱动计算的是 Pregel 执行引擎——一个基于 Google Pregel 论文思想、专门为 AI 工作流定制的 BSP(Bulk Synchronous Parallel)运行时。

Google 在 2010 年发表的 Pregel 论文描述了一种用于大规模图计算的编程模型:计算以超步(superstep)为单位推进,每个超步中所有活跃节点并行执行,执行结果在超步之间通过消息传递可见。LangGraph 将这个模型适配到了 AI 工作流场景——"节点"是 AI Agent 或工具调用,"消息"是 Channel 中的状态更新,"超步"是一轮任务执行与状态同步。

本章将深入剖析以下核心组件:

  • Pregel 类(pregel/main.py)—— 执行引擎的公共接口
  • PregelLooppregel/_loop.py)—— 执行循环的核心状态机
  • SyncPregelLoop / AsyncPregelLoop —— 同步和异步的具体实现
  • prepare_next_tasks 算法 —— 决定每个超步执行哪些任务
  • apply_writes 算法 —— 在超步之间更新 Channel 状态
  • 版本追踪机制 —— 高效判定哪些节点需要被触发
  • max_steps 停止条件 —— 防止无限循环的安全阀

本章要点

  1. Pregel 类是 LangGraph 的统一运行时接口,invoke()stream() 都构建在同一个执行循环上
  2. PregelLoop 是一个状态机,核心循环为 tick() -> 执行任务 -> after_tick()
  3. prepare_next_tasks 通过 Channel 版本比较决定哪些节点在下一步执行
  4. apply_writes 在超步之间原子地更新所有 Channel,确保步内隔离
  5. 版本追踪使用 channel_versionsversions_seen 两张表实现高效的变更检测
  6. recursion_limit 通过 step > stop 条件提供安全停止保证

6.2 Pregel 类:执行引擎的入口

Pregel 类定义在 pregel/main.py 中,是 CompiledStateGraph 的父类。它持有执行所需的全部配置:

python
class Pregel(PregelProtocol, Generic[StateT, ContextT, InputT, OutputT]):
    nodes: dict[str, PregelNode]         # 编译后的节点
    channels: dict[str, BaseChannel | ManagedValueSpec]  # 通道定义
    input_channels: str | Sequence[str]  # 输入通道
    output_channels: str | Sequence[str] # 输出通道
    stream_channels: str | Sequence[str] | None
    trigger_to_nodes: Mapping[str, Sequence[str]]  # 优化映射
    checkpointer: Checkpointer           # 检查点存储
    store: BaseStore | None              # 持久化存储
    cache: BaseCache | None              # 节点结果缓存
    retry_policy: Sequence[RetryPolicy]  # 全局重试策略
    cache_policy: CachePolicy | None     # 全局缓存策略
    interrupt_before_nodes: All | Sequence[str]
    interrupt_after_nodes: All | Sequence[str]
    step_timeout: float | None           # 步超时

6.2.1 invoke() 和 stream() 的关系

在 Pregel 的设计中,invoke() 是基于 stream() 实现的——它调用 stream() 收集所有输出,然后返回最终值:

python
def invoke(self, input, config=None, *, stream_mode="values", ...):
    latest = None
    for chunk in self.stream(
        input, config,
        stream_mode=["updates", "values"] if stream_mode == "values"
        else stream_mode,
        ...
    ):
        if stream_mode == "values":
            mode, payload = chunk
            if mode == "values":
                latest = payload
    return latest

这意味着 stream() 才是真正的执行入口。所有的执行逻辑都围绕流式输出构建。

6.2.2 stream() 的执行框架

stream() 方法的核心结构如下:

python
def stream(self, input, config=None, *, stream_mode=None, ...):
    # 1. 准备配置
    stream_modes, output_keys, interrupt_before_, interrupt_after_, \
        checkpointer, store, cache, durability_ = self._defaults(...)

    # 2. 构建流式队列
    stream = SyncQueue()

    # 3. 进入 PregelLoop 上下文
    with SyncPregelLoop(
        input, stream=StreamProtocol(stream.put, stream_modes),
        config=config, checkpointer=checkpointer,
        nodes=self.nodes, specs=self.channels,
        ...
    ) as loop:
        # 4. 创建 Runner
        runner = PregelRunner(
            submit=weakref.WeakMethod(loop.submit),
            put_writes=weakref.WeakMethod(loop.put_writes),
        )
        # 5. BSP 主循环
        while loop.tick():
            for _ in runner.tick(
                [t for t in loop.tasks.values() if not t.writes],
                timeout=self.step_timeout,
                schedule_task=loop.accept_push,
            ):
                yield from _output(stream.get, ...)
            loop.after_tick()

这段代码精确地体现了 BSP 模型的三个阶段:

6.3 PregelLoop:执行循环的状态机

PregelLoop 是执行循环的核心,定义在 pregel/_loop.py 中。它不是一个简单的 while 循环,而是一个精心设计的状态机。

6.3.1 状态定义

python
class PregelLoop:
    # 配置
    config: RunnableConfig
    nodes: Mapping[str, PregelNode]
    specs: Mapping[str, BaseChannel | ManagedValueSpec]
    input_keys: str | Sequence[str]
    output_keys: str | Sequence[str]
    stream_keys: str | Sequence[str]

    # 运行时状态
    step: int                    # 当前超步编号
    stop: int                    # 最大步数
    status: str                  # 状态机状态
    tasks: dict[str, PregelExecutableTask]  # 当前步的任务
    output: Any | None           # 最终输出
    updated_channels: set[str] | None  # 上一步更新的通道

    # Checkpoint 状态
    checkpoint: Checkpoint
    checkpoint_config: RunnableConfig
    checkpoint_metadata: CheckpointMetadata
    checkpoint_pending_writes: list[PendingWrite]
    checkpoint_previous_versions: dict[str, str | float | int]

    # Channel 和 Managed Values
    channels: Mapping[str, BaseChannel]
    managed: ManagedValueMapping

6.3.2 状态机转换

PregelLoop 的 status 字段可以是以下值之一:

6.3.3 SyncPregelLoop 的生命周期

SyncPregelLoop 实现为 Python 上下文管理器,其 __enter__ 方法执行完整的初始化:

python
def __enter__(self) -> Self:
    # 1. 获取 Checkpoint
    if not self.checkpointer:
        saved = None
    elif self.checkpoint_config[CONF].get(CONFIG_KEY_CHECKPOINT_ID):
        saved = self.checkpointer.get_tuple(self.checkpoint_config)
    else:
        saved = self.checkpointer.get_tuple(self.checkpoint_config)

    if saved is None:
        saved = CheckpointTuple(
            self.checkpoint_config, empty_checkpoint(),
            {"step": -2}, None, []
        )
    elif self._migrate_checkpoint is not None:
        self._migrate_checkpoint(saved.checkpoint)

    # 2. 恢复 Checkpoint 状态
    self.checkpoint = saved.checkpoint
    self.checkpoint_metadata = saved.metadata
    self.checkpoint_pending_writes = [...]

    # 3. 初始化后台执行器
    self.submit = self.stack.enter_context(
        BackgroundExecutor(self.config)
    )

    # 4. 从 Checkpoint 恢复 Channel 状态
    self.channels, self.managed = channels_from_checkpoint(
        self.specs, self.checkpoint
    )

    # 5. 计算步数边界
    self.step = self.checkpoint_metadata["step"] + 1
    self.stop = self.step + self.config["recursion_limit"] + 1

    # 6. 处理首步输入
    self.updated_channels = self._first(
        input_keys=self.input_keys,
        updated_channels=...
    )
    return self

这里的关键点:

  • Checkpoint 恢复:如果存在之前的 Checkpoint(例如从中断点恢复),直接加载而非从空状态开始
  • step 的计算:从 Checkpoint 的元数据中恢复步数,确保恢复执行时步数连续
  • stop 的计算step + recursion_limit + 1+1 是因为比较条件是 step > stop(严格大于),所以需要多一步的余量
  • _first():处理首步输入——将用户输入写入 Channel,或者在恢复执行时跳过输入处理

6.4 _first():首步输入处理

_first() 方法是执行循环中最复杂的初始化逻辑,它需要区分三种场景:

基于 VitePress 构建