Appearance
第9章 中断与人机协作
9.1 引言
在 AI Agent 的实际应用中,纯自动化的执行流程往往不够。当 Agent 需要进行高风险操作(如转账、删除数据)、当决策需要人类专业判断(如医疗诊断确认)、或者当信息不足需要用户补充时,系统必须能够优雅地暂停执行,等待人类介入,然后从暂停点恢复。
这就是 LangGraph 中断(Interrupt)机制要解决的问题。与传统的回调或轮询方案不同,LangGraph 的中断是一种持久化的暂停——执行状态被完整保存到 Checkpoint 中,进程可以终止、重启,甚至迁移到另一台机器上,只要提供正确的 thread_id,就能从中断点恢复执行。
本章将从源码层面深入分析中断机制的完整实现:从 interrupt() 函数如何抛出异常、到 GraphInterrupt 如何被 Pregel 循环捕获、再到恢复时如何通过 Command(resume=...) 将值传回节点。我们还将探讨 interrupt_before/interrupt_after 的配置式中断,以及多中断场景下的匹配策略。
为了充分理解 LangGraph 中断机制的精妙设计,有必要先反思传统的暂停/恢复方案。最简单的做法是在每个可能中断的位置设置回调函数,让用户代码手动管理暂停逻辑。这种方法的问题在于:它将中断的责任分散到了业务代码中,增加了认知负担;而且回调模式难以持久化——一旦进程终止,回调的上下文就丢失了。另一种方案是使用协程或 async/await,但这要求所有节点函数都是异步的,而且 Python 的协程状态无法可靠地序列化。LangGraph 选择了第三种路径:基于异常的暂停加重新执行的恢复。这种方式让节点函数保持简单(可以是普通同步函数),同时通过 Checkpoint 实现真正的持久化暂停。代价是恢复时需要重新执行整个节点,但通过 scratchpad 的索引追踪,已解决的中断可以立即返回缓存值,实际的重复执行开销极小。
本章要点
- interrupt() 函数:理解其基于异常的暂停机制和 scratchpad 的索引追踪
- Interrupt 数据类型:掌握中断值的结构及其基于命名空间的确定性 ID 生成
- interrupt_before/interrupt_after:区分编译时配置的声明式中断与运行时的命令式中断
- 暂停与恢复机制:深入 Pregel 循环如何捕获中断、保存状态、以及恢复执行
- 与 Checkpoint 的配合:理解中断信息如何作为 pending_writes 持久化
- 多中断与按 ID 恢复:掌握同一节点中多个中断的索引匹配和按 ID 精确恢复
9.2 Interrupt 数据类型
9.2.1 Interrupt 类定义
Interrupt 是一个不可变的数据类,用于封装中断信息:
python
# langgraph/types.py
@final
@dataclass(init=False, slots=True)
class Interrupt:
value: Any
"""The value associated with the interrupt."""
id: str
"""The ID of the interrupt. Can be used to resume the interrupt directly."""
def __init__(self, value: Any, id: str = _DEFAULT_INTERRUPT_ID, **deprecated_kwargs):
self.value = value
if (
(ns := deprecated_kwargs.get("ns", MISSING)) is not MISSING
and (id == _DEFAULT_INTERRUPT_ID)
and (isinstance(ns, Sequence))
):
self.id = xxh3_128_hexdigest("|".join(ns).encode())
else:
self.id = id
@classmethod
def from_ns(cls, value: Any, ns: str) -> Interrupt:
return cls(value=value, id=xxh3_128_hexdigest(ns.encode()))两个核心属性:
value:中断携带的值,可以是任意类型。它被传递给客户端,用于展示中断原因或请求用户输入。例如 "请确认是否执行转账操作?" 或 {"question": "请选择以下选项", "options": ["A", "B", "C"]}。
id:中断的唯一标识符。注意它不是随机生成的 UUID,而是通过 xxh3_128_hexdigest 对检查点命名空间进行哈希计算得出的确定性 ID。这意味着同一个执行路径上的同一个中断,在不同运行中会生成相同的 ID。
python
# 中断 ID 的确定性生成
Interrupt.from_ns(
value="请确认操作",
ns=conf[CONFIG_KEY_CHECKPOINT_NS], # 例如 "agent:task-id-123"
)
# id = xxh3_128_hexdigest(b"agent:task-id-123")这个设计选择服务于多中断场景下的精确恢复:客户端可以通过中断 ID 指定要恢复哪个中断。由于 xxh3_128_hexdigest 是一个非密码学的高速哈希函数,ID 的计算几乎没有性能开销。128 位的输出空间足够大,碰撞概率可以忽略不计。
@final 装饰器和 slots=True 的使用也值得注意。@final 禁止了子类化,这确保了 Interrupt 的行为在整个系统中是一致和可预测的——没有用户代码可以通过继承来改变中断的序列化或比较行为。slots=True 则通过使用 __slots__ 而非 __dict__ 来存储属性,减少了内存开销并略微提升了属性访问速度。这些都是面向高性能场景的微优化,体现了 LangGraph 在底层基础设施上的精细打磨。
9.2.2 GraphInterrupt 异常
GraphInterrupt 是中断机制的传播载体:
python
# langgraph/errors.py
class GraphBubbleUp(Exception):
"""所有需要向上冒泡的异常的基类"""
pass
class GraphInterrupt(GraphBubbleUp):
"""Raised when a subgraph is interrupted,
suppressed by the root graph. Never raised directly."""
def __init__(self, interrupts: Sequence[Interrupt] = ()) -> None:
super().__init__(interrupts)GraphInterrupt 继承自 GraphBubbleUp,这是 LangGraph 中所有需要跨层传播的异常的基类。关键设计:GraphInterrupt 在根图中被抑制,不会泄漏到用户代码中。它只是一种内部的控制流机制。
9.3 interrupt() 函数深度解析
9.3.1 核心实现
interrupt() 函数是用户在节点中触发中断的唯一入口:
python
# langgraph/types.py
def interrupt(value: Any) -> Any:
from langgraph._internal._constants import (
CONFIG_KEY_CHECKPOINT_NS, CONFIG_KEY_SCRATCHPAD,
CONFIG_KEY_SEND, RESUME,
)
from langgraph.config import get_config
from langgraph.errors import GraphInterrupt
conf = get_config()["configurable"]
# 1. 追踪中断索引
scratchpad = conf[CONFIG_KEY_SCRATCHPAD]
idx = scratchpad.interrupt_counter()
# 2. 查找之前保存的恢复值
if scratchpad.resume:
if idx < len(scratchpad.resume):
conf[CONFIG_KEY_SEND]([(RESUME, scratchpad.resume)])
return scratchpad.resume[idx]
# 3. 查找当前的恢复值(来自 Command(resume=...))
v = scratchpad.get_null_resume(True)
if v is not None:
assert len(scratchpad.resume) == idx
scratchpad.resume.append(v)
conf[CONFIG_KEY_SEND]([(RESUME, scratchpad.resume)])
return v
# 4. 没有恢复值,抛出中断
raise GraphInterrupt((
Interrupt.from_ns(
value=value,
ns=conf[CONFIG_KEY_CHECKPOINT_NS],
),
))这个函数的精妙之处在于它的双重身份:第一次调用时它是一个"暂停器"(抛出异常),恢复后重新执行时它是一个"值提供器"(返回恢复值)。从用户的角度看,interrupt() 就像一个普通的阻塞式输入函数——调用它时程序暂停,收到输入后继续。但在底层,这两次调用实际上发生在不同的进程生命周期中,中间可能经历了数分钟甚至数天的等待。Checkpoint 机制弥合了这种时间断裂,使得节点函数的编写者不需要关心持久化的细节。
理解这个函数的关键在于认识到它的四个分支:第一个分支(步骤 1)通过 scratchpad 的 interrupt_counter 追踪当前是第几个中断调用;第二个分支(步骤 2)检查是否有之前保存的恢复值列表,如果当前索引在范围内则直接返回;第三个分支(步骤 3)尝试获取新提供的恢复值;第四个分支(步骤 4)在没有任何恢复值时抛出异常。这种分层的查找策略使得多中断场景可以正确工作——前面的中断返回已缓存的值,最新的中断返回新提供的值,再后面的中断抛出异常等待用户输入。
9.3.2 PregelScratchpad 的角色
PregelScratchpad 是每个任务执行期间的临时工作区:
python
@dataclasses.dataclass(**_DC_KWARGS)
class PregelScratchpad:
step: int
stop: int
call_counter: Callable[[], int]
interrupt_counter: Callable[[], int] # 追踪当前任务的中断索引
get_null_resume: Callable[[bool], Any]
resume: list[Any] # 之前保存的恢复值列表
subgraph_counter: Callable[[], int]interrupt_counter 是一个闭包,每次调用自增并返回当前索引。它在每个任务开始执行时被初始化为从零开始的计数器。使用闭包而非实例变量的原因是每个任务需要独立的计数器——在并行执行的场景中,多个任务可能同时包含 interrupt() 调用,它们的索引必须互不干扰。这使得同一个节点中的多个 interrupt() 调用可以按顺序匹配恢复值:
python
# 节点中的多个中断
def review_node(state):
# 第一次 interrupt: idx=0
name = interrupt("请输入您的姓名")
# 第二次 interrupt: idx=1
age = interrupt("请输入您的年龄")
return {"name": name, "age": age}