Appearance
第6章 Pregel 执行引擎
6.1 引言
Pregel 是 LangGraph 的心脏。当你调用 compiled_graph.invoke() 或 compiled_graph.stream() 时,真正驱动计算的是 Pregel 执行引擎——一个基于 Google Pregel 论文思想、专门为 AI 工作流定制的 BSP(Bulk Synchronous Parallel)运行时。
Google 在 2010 年发表的 Pregel 论文描述了一种用于大规模图计算的编程模型:计算以超步(superstep)为单位推进,每个超步中所有活跃节点并行执行,执行结果在超步之间通过消息传递可见。LangGraph 将这个模型适配到了 AI 工作流场景——"节点"是 AI Agent 或工具调用,"消息"是 Channel 中的状态更新,"超步"是一轮任务执行与状态同步。
本章将深入剖析以下核心组件:
Pregel类(pregel/main.py)—— 执行引擎的公共接口PregelLoop(pregel/_loop.py)—— 执行循环的核心状态机SyncPregelLoop/AsyncPregelLoop—— 同步和异步的具体实现prepare_next_tasks算法 —— 决定每个超步执行哪些任务apply_writes算法 —— 在超步之间更新 Channel 状态- 版本追踪机制 —— 高效判定哪些节点需要被触发
max_steps停止条件 —— 防止无限循环的安全阀
本章要点
- Pregel 类是 LangGraph 的统一运行时接口,
invoke()和stream()都构建在同一个执行循环上 PregelLoop是一个状态机,核心循环为tick()-> 执行任务 ->after_tick()prepare_next_tasks通过 Channel 版本比较决定哪些节点在下一步执行apply_writes在超步之间原子地更新所有 Channel,确保步内隔离- 版本追踪使用
channel_versions和versions_seen两张表实现高效的变更检测 recursion_limit通过step > stop条件提供安全停止保证
6.2 Pregel 类:执行引擎的入口
Pregel 类定义在 pregel/main.py 中,是 CompiledStateGraph 的父类。它持有执行所需的全部配置:
python
class Pregel(PregelProtocol, Generic[StateT, ContextT, InputT, OutputT]):
nodes: dict[str, PregelNode] # 编译后的节点
channels: dict[str, BaseChannel | ManagedValueSpec] # 通道定义
input_channels: str | Sequence[str] # 输入通道
output_channels: str | Sequence[str] # 输出通道
stream_channels: str | Sequence[str] | None
trigger_to_nodes: Mapping[str, Sequence[str]] # 优化映射
checkpointer: Checkpointer # 检查点存储
store: BaseStore | None # 持久化存储
cache: BaseCache | None # 节点结果缓存
retry_policy: Sequence[RetryPolicy] # 全局重试策略
cache_policy: CachePolicy | None # 全局缓存策略
interrupt_before_nodes: All | Sequence[str]
interrupt_after_nodes: All | Sequence[str]
step_timeout: float | None # 步超时6.2.1 invoke() 和 stream() 的关系
在 Pregel 的设计中,invoke() 是基于 stream() 实现的——它调用 stream() 收集所有输出,然后返回最终值:
python
def invoke(self, input, config=None, *, stream_mode="values", ...):
latest = None
for chunk in self.stream(
input, config,
stream_mode=["updates", "values"] if stream_mode == "values"
else stream_mode,
...
):
if stream_mode == "values":
mode, payload = chunk
if mode == "values":
latest = payload
return latest这意味着 stream() 才是真正的执行入口。所有的执行逻辑都围绕流式输出构建。
6.2.2 stream() 的执行框架
stream() 方法的核心结构如下:
python
def stream(self, input, config=None, *, stream_mode=None, ...):
# 1. 准备配置
stream_modes, output_keys, interrupt_before_, interrupt_after_, \
checkpointer, store, cache, durability_ = self._defaults(...)
# 2. 构建流式队列
stream = SyncQueue()
# 3. 进入 PregelLoop 上下文
with SyncPregelLoop(
input, stream=StreamProtocol(stream.put, stream_modes),
config=config, checkpointer=checkpointer,
nodes=self.nodes, specs=self.channels,
...
) as loop:
# 4. 创建 Runner
runner = PregelRunner(
submit=weakref.WeakMethod(loop.submit),
put_writes=weakref.WeakMethod(loop.put_writes),
)
# 5. BSP 主循环
while loop.tick():
for _ in runner.tick(
[t for t in loop.tasks.values() if not t.writes],
timeout=self.step_timeout,
schedule_task=loop.accept_push,
):
yield from _output(stream.get, ...)
loop.after_tick()这段代码精确地体现了 BSP 模型的三个阶段:
6.3 PregelLoop:执行循环的状态机
PregelLoop 是执行循环的核心,定义在 pregel/_loop.py 中。它不是一个简单的 while 循环,而是一个精心设计的状态机。
6.3.1 状态定义
python
class PregelLoop:
# 配置
config: RunnableConfig
nodes: Mapping[str, PregelNode]
specs: Mapping[str, BaseChannel | ManagedValueSpec]
input_keys: str | Sequence[str]
output_keys: str | Sequence[str]
stream_keys: str | Sequence[str]
# 运行时状态
step: int # 当前超步编号
stop: int # 最大步数
status: str # 状态机状态
tasks: dict[str, PregelExecutableTask] # 当前步的任务
output: Any | None # 最终输出
updated_channels: set[str] | None # 上一步更新的通道
# Checkpoint 状态
checkpoint: Checkpoint
checkpoint_config: RunnableConfig
checkpoint_metadata: CheckpointMetadata
checkpoint_pending_writes: list[PendingWrite]
checkpoint_previous_versions: dict[str, str | float | int]
# Channel 和 Managed Values
channels: Mapping[str, BaseChannel]
managed: ManagedValueMapping6.3.2 状态机转换
PregelLoop 的 status 字段可以是以下值之一:
6.3.3 SyncPregelLoop 的生命周期
SyncPregelLoop 实现为 Python 上下文管理器,其 __enter__ 方法执行完整的初始化:
python
def __enter__(self) -> Self:
# 1. 获取 Checkpoint
if not self.checkpointer:
saved = None
elif self.checkpoint_config[CONF].get(CONFIG_KEY_CHECKPOINT_ID):
saved = self.checkpointer.get_tuple(self.checkpoint_config)
else:
saved = self.checkpointer.get_tuple(self.checkpoint_config)
if saved is None:
saved = CheckpointTuple(
self.checkpoint_config, empty_checkpoint(),
{"step": -2}, None, []
)
elif self._migrate_checkpoint is not None:
self._migrate_checkpoint(saved.checkpoint)
# 2. 恢复 Checkpoint 状态
self.checkpoint = saved.checkpoint
self.checkpoint_metadata = saved.metadata
self.checkpoint_pending_writes = [...]
# 3. 初始化后台执行器
self.submit = self.stack.enter_context(
BackgroundExecutor(self.config)
)
# 4. 从 Checkpoint 恢复 Channel 状态
self.channels, self.managed = channels_from_checkpoint(
self.specs, self.checkpoint
)
# 5. 计算步数边界
self.step = self.checkpoint_metadata["step"] + 1
self.stop = self.step + self.config["recursion_limit"] + 1
# 6. 处理首步输入
self.updated_channels = self._first(
input_keys=self.input_keys,
updated_channels=...
)
return self这里的关键点:
- Checkpoint 恢复:如果存在之前的 Checkpoint(例如从中断点恢复),直接加载而非从空状态开始
step的计算:从 Checkpoint 的元数据中恢复步数,确保恢复执行时步数连续stop的计算:step + recursion_limit + 1。+1是因为比较条件是step > stop(严格大于),所以需要多一步的余量_first():处理首步输入——将用户输入写入 Channel,或者在恢复执行时跳过输入处理
6.4 _first():首步输入处理
_first() 方法是执行循环中最复杂的初始化逻辑,它需要区分三种场景: