Skip to content

第8章 Checkpoint 持久化

8.1 引言

在前面的章节中,我们深入剖析了 Pregel 执行循环如何驱动图的运行、Channel 如何承载状态的流转、以及任务调度器如何协调节点的并发执行。然而,所有这些精密的运行时机制都面临一个根本性问题:当进程终止、网络中断或用户需要暂停时,图的执行状态将随之消失。

Checkpoint(检查点)机制正是 LangGraph 对这一问题的系统性回答。它不仅仅是简单的"保存与恢复",而是一套精心设计的持久化架构,涵盖了从数据结构定义、序列化协议、版本追踪到多后端存储适配的完整链路。Checkpoint 是 LangGraph 实现中断恢复、人机协作、时间旅行调试等高级特性的基石。

本章将从源码层面深入分析 LangGraph 1.1.6 的 Checkpoint 持久化体系。我们将逐一剖析 Checkpoint 数据结构的每个字段、BaseCheckpointSaver 基类的接口设计、JsonPlusSerializer 的序列化策略、以及 SQLite/Postgres 两种生产级实现的存储方案。理解 Checkpoint 体系不仅有助于正确使用 LangGraph 的持久化功能,更能帮助开发者在面对自定义存储后端、性能优化或调试状态恢复问题时做出正确的架构决策。

要真正理解 Checkpoint 为何如此设计,我们需要先思考状态持久化面临的核心挑战。首先,图的状态不是简单的键值对——它包含 Channel 的值、版本信息和节点的执行进度,这些信息必须作为一个原子单元被保存和恢复。其次,持久化的粒度需要精心选择——太粗会丢失中间状态,太细会拖慢执行速度。最后,序列化必须能够处理 Python 生态中丰富多样的类型系统,从简单的字典到复杂的 Pydantic 模型、从日期时间到 NumPy 数组,都需要可靠地序列化和反序列化。LangGraph 的 Checkpoint 体系对这些挑战给出了优雅而务实的回答。

本章要点

  1. Checkpoint 数据结构:理解 Checkpoint TypedDict 的六个核心字段及其在状态恢复中的作用
  2. CheckpointSaver 基类:掌握 get_tuple/put/list/put_writes 四大核心方法的契约
  3. CheckpointMetadata 与寻址:理解 thread_id/checkpoint_id/checkpoint_ns 三维寻址体系
  4. 序列化体系:深入 JsonPlusSerializer 基于 ormsgpack 的类型安全序列化协议
  5. 持久化模式:区分 sync/async/exit 三种持久化策略的适用场景
  6. 存储实现:对比 InMemory/SQLite/Postgres 三种实现的架构差异

8.2 Checkpoint 数据结构

8.2.1 Checkpoint TypedDict

Checkpoint 是 LangGraph 持久化体系的核心数据结构。它定义在 langgraph.checkpoint.base 模块中,使用 Python 的 TypedDict 来提供类型安全的字典访问:

python
# langgraph/checkpoint/base/__init__.py

class Checkpoint(TypedDict):
    """State snapshot at a given point in time."""

    v: int
    """The version of the checkpoint format. Currently 1."""
    id: str
    """The ID of the checkpoint. Both unique and monotonically increasing."""
    ts: str
    """The timestamp of the checkpoint in ISO 8601 format."""
    channel_values: dict[str, Any]
    """The values of the channels at the time of the checkpoint."""
    channel_versions: ChannelVersions
    """The versions of the channels at the time of the checkpoint."""
    versions_seen: dict[str, ChannelVersions]
    """Map from node ID to map from channel name to version seen."""
    updated_channels: list[str] | None
    """The channels that were updated in this checkpoint."""

每个字段都承担着精确的职责:

v(格式版本):当前 Pregel 层使用的版本号为 4(定义在 pregel/_checkpoint.pyLATEST_VERSION = 4),而 checkpoint 基类中的版本为 2。这个版本号用于在反序列化时进行格式迁移。

id(检查点标识):采用 UUID v6 生成,这是一个关键的设计选择。UUID v6 将时间戳编码到高位,使得 ID 既全局唯一又单调递增。这意味着可以直接通过字符串比较来确定检查点的先后顺序,无需额外的排序字段。

ts(时间戳):ISO 8601 格式的 UTC 时间戳,主要用于调试和审计,不参与排序逻辑。

channel_values(通道值):存储所有 Channel 在该时刻的快照值。这是状态恢复的关键数据。需要注意的是,并非所有 Channel 都会出现在这个字典中——只有在 channel_versions 中有记录(即至少被写入过一次)的 Channel 才会被保存。这种按需保存的策略避免了为未使用的 Channel 浪费存储空间。在 Postgres 实现中,channel_values 并不直接存储在 checkpoints 表中,而是通过 channel_versions 映射到独立的 checkpoint_blobs 表,实现了跨检查点的去重。

channel_versions(通道版本):记录每个 Channel 的当前版本号,用于增量更新判断。版本号的类型由 BaseCheckpointSaver 的泛型参数 V 决定,可以是 intfloatstr。每当一个 Channel 被写入时,其版本号通过 get_next_version 方法递增。这个版本追踪机制是 LangGraph 实现增量持久化的基础——通过比较前后两个检查点的 channel_versions,可以精确知道哪些 Channel 发生了变化,从而只持久化变化的部分。

versions_seen(已见版本):二维映射,记录每个节点上次执行时看到的各 Channel 版本。这是 Pregel 调度器判断"哪些节点需要重新执行"的核心依据。当 channel_versions[channel] > versions_seen[node][channel] 时,说明该 Channel 在节点上次执行之后被更新过,节点需要被重新调度。这个机制确保了图的执行遵循数据驱动的原则——只有当输入数据确实发生变化时,节点才会被触发。

updated_channels:记录在当前检查点中被更新的 Channel 列表。这个字段在 v4 版本中引入,用于优化 prepare_next_tasks 的性能——通过直接查阅被更新的 Channel 列表,避免了遍历所有 Channel 比较版本号的开销。当值为 None 时,回退到全量版本比较。

8.2.2 UUID v6:单调递增的标识符

LangGraph 没有使用标准库的 UUID v4,而是自行实现了 UUID v6。这个选择背后有深思熟虑的工程考量:

python
# langgraph/checkpoint/base/id.py

def uuid6(node=None, clock_seq=None) -> UUID:
    global _last_v6_timestamp
    nanoseconds = time.time_ns()
    timestamp = nanoseconds // 100 + 0x01B21DD213814000
    if _last_v6_timestamp is not None and timestamp <= _last_v6_timestamp:
        timestamp = _last_v6_timestamp + 1
    _last_v6_timestamp = timestamp
    if clock_seq is None:
        clock_seq = random.getrandbits(14)
    if node is None:
        node = random.getrandbits(48)
    time_high_and_time_mid = (timestamp >> 12) & 0xFFFFFFFFFFFF
    time_low_and_version = timestamp & 0x0FFF
    uuid_int = time_high_and_time_mid << 80
    uuid_int |= time_low_and_version << 64
    uuid_int |= (clock_seq & 0x3FFF) << 48
    uuid_int |= node & 0xFFFFFFFFFFFF
    return UUID(int=uuid_int, version=6)

UUID v6 的核心特性是时间有序性:时间戳占据高位字节,使得按字符串排序就等同于按时间排序。这在数据库中带来两个关键优势:

  1. 索引友好:B-tree 索引在处理单调递增的主键时效率最高,避免了 UUID v4 造成的随机写入
  2. 天然排序max(checkpoints.keys()) 就能找到最新的检查点,无需额外查询

注意 clock_seq 参数的特殊用法:在 create_checkpoint 中,clock_seq=step 将步骤号编码进 UUID,而 empty_checkpoint 使用 clock_seq=-2 作为哨兵值。此外,全局变量 _last_v6_timestamp 保证了即使在同一纳秒内多次调用 uuid6(),生成的 ID 也是严格递增的。这对于高频创建检查点的场景(如并行子图执行)至关重要。

从数据库性能的角度看,UUID v6 作为主键的优势是巨大的。传统的 UUID v4 由于随机性导致 B-tree 索引的页面分裂频繁,写入性能随数据量增长而显著下降。而 UUID v6 的时间有序性使得新记录总是追加到索引的末尾,避免了随机插入带来的索引碎片化。在 LangGraph 的使用场景中,检查点是高频创建的——每个执行步骤至少创建一个检查点。选择 UUID v6 而非递增整数的原因在于:UUID 不需要中央协调就能保证全局唯一性,这对于分布式部署和多进程场景至关重要。

8.2.3 CheckpointTuple 与 CheckpointMetadata

Checkpoint 本身只存储图的运行时状态。围绕它,还有两个重要的伴生类型:

python
class CheckpointMetadata(TypedDict, total=False):
    source: Literal["input", "loop", "update", "fork"]
    step: int
    parents: dict[str, str]
    run_id: str

class CheckpointTuple(NamedTuple):
    config: RunnableConfig
    checkpoint: Checkpoint
    metadata: CheckpointMetadata
    parent_config: RunnableConfig | None = None
    pending_writes: list[PendingWrite] | None = None

CheckpointMetadatasource 字段记录了检查点的来源,这对于理解图的执行历史至关重要:

  • "input":由图的输入触发创建(step = -1)
  • "loop":在 Pregel 循环的每次迭代中创建
  • "update":由手动状态更新(update_state)创建
  • "fork":从另一个检查点复制而来

CheckpointTuple 则是检查点系统的"完整视图",将检查点本体、元数据、配置和待处理写入打包在一起。使用 NamedTuple 而非 TypedDict 意味着它是不可变的,这保证了从存储层返回的检查点不会被意外修改。

pending_writes 尤为重要——它存储了尚未被合并到下一个检查点的中间写入,这是实现中断恢复的关键。每个待处理写入是一个 (task_id, channel, value) 三元组,记录了哪个任务向哪个 Channel 写入了什么值。当图从中断恢复时,这些写入会被重放,避免重新执行已完成的节点。parent_config 字段指向上一个检查点的配置,这构成了一条单向链表,允许沿着检查点历史回溯——这是时间旅行功能的数据基础。

8.3 三维寻址体系

LangGraph 使用三个维度来唯一定位一个检查点:

8.3.1 thread_id:会话标识

thread_id 是最外层的分区键。它将不同的对话或工作流隔离在独立的命名空间中:

python
config = {"configurable": {"thread_id": "user-alice-conv-1"}}
graph.invoke(inputs, config)

同一个 thread_id 下的所有检查点构成一条线性的执行历史链。重用 thread_id 可以实现对话记忆的累积,而每次使用新的 thread_id 则开启独立的执行。在生产系统中,thread_id 的选择策略需要根据业务场景仔细考虑:对于客服对话,可以使用用户会话 ID;对于数据处理流水线,可以使用批次 ID 加任务 ID 的组合;对于定时任务,可以使用调度时间戳。需要注意的是,thread_id 是字符串类型,如果传入了非字符串值,PregelLoop 会自动将其转换为字符串。

8.3.2 checkpoint_ns:命名空间

checkpoint_ns(checkpoint namespace)用于支持子图。根图的命名空间为空字符串 "",子图的命名空间由父图的命名空间和分隔符组成:

python
NS_SEP = "|"    # 层级分隔符
NS_END = ":"    # 命名空间与任务ID的分隔符

# 例如:根图节点 "agent" 的子图,其命名空间为:
# "agent:task-id-xxx|inner_agent:task-id-yyy"

这种层级命名空间设计使得子图可以拥有独立的检查点历史,同时保持与父图的关联。

8.3.3 checkpoint_id:检查点标识

checkpoint_id 是 UUID v6 格式的标识符,在给定的 (thread_id, checkpoint_ns) 下唯一标识一个检查点。由于 UUID v6 的单调递增特性,省略 checkpoint_id 时默认返回最新的检查点。

8.4 BaseCheckpointSaver 基类

8.4.1 核心接口设计

BaseCheckpointSaver 是所有检查点存储后端必须实现的抽象基类。它定义了四组核心操作(同步+异步):

python
class BaseCheckpointSaver(Generic[V]):
    serde: SerializerProtocol = JsonPlusSerializer()

    def __init__(self, *, serde: SerializerProtocol | None = None) -> None:
        self.serde = maybe_add_typed_methods(serde or self.serde)

    # 1. 读取单个检查点
    def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
        raise NotImplementedError

    # 2. 列出检查点
    def list(self, config, *, filter=None, before=None, limit=None
    ) -> Iterator[CheckpointTuple]:
        raise NotImplementedError

    # 3. 存储检查点
    def put(self, config, checkpoint, metadata, new_versions
    ) -> RunnableConfig:
        raise NotImplementedError

    # 4. 存储中间写入
    def put_writes(self, config, writes, task_id, task_path=""
    ) -> None:
        raise NotImplementedError

基于 VitePress 构建