Appearance
第7章 工具编排与并发执行
当模型在一次响应中同时调用 Grep、Read、Glob 三个工具搜索代码库时,这些调用是顺序执行还是并行执行?当模型一边读文件一边写文件时,系统如何保证写操作不会和读操作产生竞态条件?当某个 Bash 命令执行失败时,还在排队的其他工具调用该怎么处理?
这些问题的答案,隐藏在 Claude Code 的工具编排层中。这一层位于 API 响应解析和工具实际执行之间,它决定了哪些工具可以并行运行、哪些必须串行等待,以及在整个过程中如何维护上下文一致性。
本章将深入这个编排层的完整实现,从批次分区算法到并发执行引擎,从 Pre/Post Hooks 管道到文件状态追踪,揭示一个生产级 Agent 系统是如何高效且安全地管理数十个工具的并发调度的。理解工具编排的设计,不仅有助于理解 Claude Code 本身的行为特征(比如为什么有些操作明显更快),更能为构建自己的 Agent 系统提供可直接复用的工程模式。
本章要点
- 工具编排的核心是
runTools()async generator:它从 API 响应中提取工具调用块,分批执行并流式产出结果 - 批次分区算法:连续的读操作合并为一个并发批次,写操作独占一个串行批次
- 双模式执行引擎:
runToolsConcurrently()利用all()并发调度器实现有上限的并行执行,runToolsSerially()保证写操作的顺序一致性 - 完整的工具执行管道:每个工具调用经历输入校验、Pre-Hook、权限检查、实际执行、Post-Hook、结果收集六个阶段
- 文件状态缓存:
FileStateCache基于 LRU 策略缓存已读文件,避免重复 IO;FileHistory在每次写操作前创建备份,支持回退 - Hook 系统:
PreToolUse和PostToolUse钩子允许用户自定义命令在工具执行前后介入,实现权限控制、日志记录、输入改写等高级功能
7.1 工具编排总览
从 API 响应到工具执行
在第5章中,我们分析了 Claude Code 的流式响应处理。当 API 返回的助手消息中包含 tool_use 类型的内容块时,查询引擎需要提取这些工具调用并编排它们的执行。这个提取过程发生在 query.ts 的主循环中:
typescript
// 源码位置: src/query.ts
const toolUseBlocks: ToolUseBlock[] = []
// 流式处理中,每当检测到 tool_use 块就收集起来
const assistantMessages: AssistantMessage[] = []
// 流结束后,将工具调用交给编排层
const toolUpdates = streamingToolExecutor
? streamingToolExecutor.getRemainingResults()
: runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)
for await (const update of toolUpdates) {
if (update.message) {
// 将工具执行结果加入消息流
}
}这里有一个关键的分支:系统可以使用 StreamingToolExecutor(流式工具执行器,在 API 响应流式传入过程中就开始执行工具)或者 runTools()(批量编排,等所有工具调用解析完毕后统一调度)。两者的编排逻辑相似,但 runTools() 是更核心的抽象,它清晰地展示了编排层的完整决策链路。
之所以存在两种模式,是因为实际场景中存在一个权衡:流式模式可以让工具尽早开始执行(API 还在返回后续 tool_use 块时,前面的工具已经在运行了),带来更低的端到端延迟;而批量模式拥有全部工具调用的全局视图,可以做出更优的分区决策。对于大多数交互式场景,流式模式的延迟优势更有价值;在某些需要精确控制执行顺序的场景(如查询辅助工具调用),批量模式更为可靠。
runTools() 的结构
runTools() 是整个编排层的入口,定义在 src/services/tools/toolOrchestration.ts 中。它是一个 async generator 函数,接收工具调用块列表,产出执行结果流:
typescript
// 源码位置: src/services/tools/toolOrchestration.ts
export async function* runTools(
toolUseMessages: ToolUseBlock[],
assistantMessages: AssistantMessage[],
canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
): AsyncGenerator<MessageUpdate, void> {
let currentContext = toolUseContext
for (const { isConcurrencySafe, blocks } of partitionToolCalls(
toolUseMessages,
currentContext,
)) {
if (isConcurrencySafe) {
// 并发执行读操作批次
const queuedContextModifiers: Record<
string,
((context: ToolUseContext) => ToolUseContext)[]
> = {}
for await (const update of runToolsConcurrently(
blocks, assistantMessages, canUseTool, currentContext,
)) {
if (update.contextModifier) {
const { toolUseID, modifyContext } = update.contextModifier
if (!queuedContextModifiers[toolUseID]) {
queuedContextModifiers[toolUseID] = []
}
queuedContextModifiers[toolUseID].push(modifyContext)
}
yield { message: update.message, newContext: currentContext }
}
// 批次结束后,按工具顺序应用上下文修改
for (const block of blocks) {
const modifiers = queuedContextModifiers[block.id]
if (!modifiers) continue
for (const modifier of modifiers) {
currentContext = modifier(currentContext)
}
}
yield { newContext: currentContext }
} else {
// 串行执行写操作批次
for await (const update of runToolsSerially(
blocks, assistantMessages, canUseTool, currentContext,
)) {
if (update.newContext) {
currentContext = update.newContext
}
yield { message: update.message, newContext: currentContext }
}
}
}
}这段代码的核心逻辑可以归纳为三步:分区 -> 判断 -> 执行。首先调用 partitionToolCalls() 将工具调用分为若干批次,然后根据每个批次的 isConcurrencySafe 标志选择并发或串行执行路径。
值得注意的是 currentContext 的管理策略。对于并发批次,上下文修改器(contextModifier)不会在执行过程中立即生效,而是先缓存起来,等整个批次执行完毕后再按照工具的原始顺序依次应用。这保证了并发执行不会导致上下文状态的不确定性。对于串行批次,每个工具执行完毕后上下文修改立即生效,后续工具看到的是更新后的上下文。
使用 async generator 的设计哲学
选择 async generator 作为编排层的核心抽象并非偶然。这种模式带来三个关键优势:
第一,流式产出。工具执行的结果不需要等所有工具跑完才返回,而是执行一个就产出一个。对于 UI 层来说,用户可以实时看到每个工具的执行进度和结果。
第二,可组合性。runTools() 产出的是 AsyncGenerator<MessageUpdate>,而 runToolsConcurrently() 和 runToolsSerially() 也是同样类型的 generator。它们可以自由嵌套和组合,而不需要复杂的回调或事件机制。
第三,背压控制。当消费者(UI 层或查询引擎)处理不过来时,generator 会自然地暂停产出,避免内存无限增长。这在工具大量并发执行时尤为重要。
第四,取消语义。async generator 天然支持取消——消费者只需要停止迭代(break 出 for-await-of 循环),generator 就会被垃圾回收。相比之下,基于 Promise.all 的并发方案在需要提前终止时会面临很大的复杂性。
在 Claude Code 的整个代码库中,async generator 是一种无处不在的模式。从 API 流式响应、工具编排、Hook 执行到 UI 更新,几乎所有涉及"随时间逐步产出结果"的场景都使用了这种抽象。这不是偶然的选择,而是经过反复验证的架构决策——它为复杂的异步流程提供了一种线性的、可组合的编程模型。
下图展示了 runTools() 的完整编排流程,从 API 响应中的工具调用块到最终结果产出的全链路:
7.2 批次分区与并发决策
partitionToolCalls 算法
批次分区是编排层最关键的决策点。partitionToolCalls() 函数将一组工具调用分为若干批次,每个批次要么全部并发执行,要么包含一个串行执行的工具:
typescript
// 源码位置: src/services/tools/toolOrchestration.ts
type Batch = { isConcurrencySafe: boolean; blocks: ToolUseBlock[] }
function partitionToolCalls(
toolUseMessages: ToolUseBlock[],
toolUseContext: ToolUseContext,
): Batch[] {
return toolUseMessages.reduce((acc: Batch[], toolUse) => {
const tool = findToolByName(toolUseContext.options.tools, toolUse.name)
const parsedInput = tool?.inputSchema.safeParse(toolUse.input)
const isConcurrencySafe = parsedInput?.success
? (() => {
try {
return Boolean(tool?.isConcurrencySafe(parsedInput.data))
} catch {
return false
}
})()
: false
if (isConcurrencySafe && acc[acc.length - 1]?.isConcurrencySafe) {
acc[acc.length - 1]!.blocks.push(toolUse)
} else {
acc.push({ isConcurrencySafe, blocks: [toolUse] })
}
return acc
}, [])
}这个算法使用了一次遍历的贪心策略,通过 reduce 累加器模式实现:从头到尾扫描所有工具调用,对每个工具判断其是否并发安全。如果当前工具是并发安全的,并且上一个批次也是并发安全的,就将当前工具追加到上一个批次(合并连续的读操作);否则开启一个新批次。注意这个算法是在线的(一次遍历),时间复杂度为 O(n),不需要回溯或全局优化。
一个重要的细节是,分区决策在执行前就确定了——它基于工具声明的并发安全性,而不是运行时的实际行为。这是一种静态分析策略,保守但可预测。
假设模型在一次响应中调用了以下工具序列:
Grep -> Read -> Read -> Edit -> Glob -> Read -> Bash(git status)分区结果将是:
批次1 [并发]: Grep, Read, Read -- 三个只读操作并行
批次2 [串行]: Edit -- 写操作独占
批次3 [并发]: Glob, Read, Bash -- Bash(git status) 是只读的,可以并行并发安全的判定标准
每个工具通过 isConcurrencySafe() 方法声明自己是否可以并发执行。这个判定与输入参数相关——同一个工具在不同输入下可能有不同的并发安全性。
以下是各类工具的典型并发安全性配置:
始终并发安全的工具(纯读操作):
typescript
// 源码位置: src/tools/GrepTool/GrepTool.ts
isConcurrencySafe() {
return true // 搜索操作总是安全的
}
// 源码位置: src/tools/FileReadTool/FileReadTool.ts
isConcurrencySafe() {
return true // 文件读取总是安全的
}
// 源码位置: src/tools/GlobTool/GlobTool.ts
isConcurrencySafe() {
return true // 文件匹配总是安全的
}条件并发安全的工具(取决于输入):
typescript
// 源码位置: src/tools/BashTool/BashTool.tsx
isConcurrencySafe(input) {
return this.isReadOnly?.(input) ?? false;
}
isReadOnly(input) {
const compoundCommandHasCd = commandHasAnyCd(input.command);
const result = checkReadOnlyConstraints(input, compoundCommandHasCd);
return result.behavior === 'allow';
}Bash 工具的并发安全性完全取决于命令本身是否只读。像 ls、cat、git status、wc -l 这样的命令被认为是只读的,可以并行执行;而 mkdir、rm、git commit 等则不行。这意味着相同的 Bash 工具,执行 git log 时可以和其他工具并行,执行 git push 时就必须独占。
Bash 工具的只读判定内部调用了 checkReadOnlyConstraints(),这是一个相当复杂的命令分析器,需要解析复合命令(管道、&&、|| 等)的每个子命令。它还需要特别处理 cd 命令——包含 cd 的复合命令(如 cd /tmp && git status)出于安全考虑被视为非只读,因为 cd 到恶意目录后执行的 git 命令可能触发 core.fsmonitor 攻击。
PowerShell 工具采用了与 Bash 相同的策略:isConcurrencySafe 直接委托给 isReadOnly。这意味着在 Windows 平台上,并发控制遵循完全一致的语义。
还有一类工具值得特别说明——AgentTool(子 Agent 工具)也声明了 isConcurrencySafe() { return true }。这看起来违反直觉,因为子 Agent 内部可能执行写操作。但设计者的考量是:每个子 Agent 有自己独立的执行上下文和 AbortController,它们之间的隔离已经在更高层保证了。将子 Agent 标记为并发安全,允许多个子 Agent 同时执行不同的任务,这是多 Agent 协作的基础。
始终不并发安全的工具(写操作或有副作用):
typescript
// 源码位置: src/Tool.ts (buildTool 默认值)
const TOOL_DEFAULTS = {
isConcurrencySafe: (_input?: unknown) => false, // 默认不安全
isReadOnly: (_input?: unknown) => false, // 默认假设有写操作
}默认值设为 false 体现了失败保守(fail-closed)的设计原则。如果一个工具的开发者忘记声明并发安全性,系统会将其当作不安全处理,确保不会出现意外的并发冲突。这与 isReadOnly 的默认值 false 和 isDestructive 的默认值 false 形成了一套一致的安全默认配置:未显式声明的工具被假定为"有写操作、不可并发、但非破坏性",这是一个合理的中间立场。
通过 buildTool() 工厂函数,所有工具定义都经过统一的默认值填充,保证了行为的一致性:
typescript
// 源码位置: src/Tool.ts
export function buildTool<D extends AnyToolDef>(def: D): BuiltTool<D> {
return {
...TOOL_DEFAULTS,
userFacingName: () => def.name,
...def,
} as BuiltTool<D>
}开发者只需要覆盖需要自定义的方法,其余自动获得安全的默认行为。
异常安全的判定过程
注意 partitionToolCalls 中判定并发安全性的代码被包裹在 try-catch 中:
typescript
const isConcurrencySafe = parsedInput?.success
? (() => {
try {
return Boolean(tool?.isConcurrencySafe(parsedInput.data))
} catch {
// 如果 isConcurrencySafe 抛异常(例如 shell-quote 解析失败),
// 保守地视为不安全
return false
}
})()
: false这个设计处理了两个边界情况:如果输入校验失败(parsedInput 不成功),直接视为不安全;如果 isConcurrencySafe 方法本身抛出异常(例如 Bash 工具的命令解析器遇到格式错误的命令),同样视为不安全。这种防御式编程确保了判定过程本身不会成为系统的故障点。
7.3 工具执行管道
管道概览
每个工具调用从进入编排层到产出结果,要经过一条完整的六阶段执行管道。下面的时序图展示了各阶段之间的交互关系:
这条管道定义在 src/services/tools/toolExecution.ts 的 checkPermissionsAndCallTool() 函数中:
输入校验(Zod) -> 工具级校验(validateInput) -> Pre-Hooks -> 权限解析 -> 实际执行 -> Post-Hooks -> 结果收集下面逐个分析每个阶段。
第一阶段:输入校验
当模型产出工具调用时,参数可能不符合工具的 schema 定义。系统首先使用 Zod 做类型级校验:
typescript
// 源码位置: src/services/tools/toolExecution.ts
const parsedInput = tool.inputSchema.safeParse(input)
if (!parsedInput.success) {
let errorContent = formatZodValidationError(tool.name, parsedInput.error)
// 如果工具是延迟加载的,检查其 schema 是否已发送给 API
const schemaHint = buildSchemaNotSentHint(
tool, toolUseContext.messages, toolUseContext.options.tools,
)
if (schemaHint) {
errorContent += schemaHint
}
// 返回格式化的错误消息给模型
return [{
message: createUserMessage({
content: [{
type: 'tool_result',
content: `<tool_use_error>InputValidationError: ${errorContent}</tool_use_error>`,
is_error: true,
tool_use_id: toolUseID,
}],
}),
}]
}错误消息被格式化为 <tool_use_error> XML 标签包裹的文本,通过 tool_result 类型回传给模型。模型收到这个错误后会自动尝试修正参数并重新调用。这种设计让模型具有"自纠错"能力——收到格式化的错误信息后,模型通常会在下一轮调用中修正参数格式。
Zod 校验错误的格式化也经过了精心设计。formatZodValidationError() 函数将 Zod 的结构化错误对象转换为人类(和模型)可读的文本,区分了三类问题:缺少必填参数、提供了未知参数、参数类型不匹配。每种错误都带有清晰的参数路径(如 todos[0].activeForm),帮助模型精确定位问题。
值得注意的是 buildSchemaNotSentHint() 的处理。对于延迟加载的工具(通过 ToolSearch 发现的工具),如果模型在没有获取完整 schema 的情况下就尝试调用,Zod 校验几乎必然失败(因为类型化参数如数组、布尔值会被当作字符串发送)。这时系统会追加一个提示,引导模型先调用 ToolSearch 加载工具 schema。
接下来是工具特定的业务校验:
typescript
// 源码位置: src/services/tools/toolExecution.ts
const isValidCall = await tool.validateInput?.(parsedInput.data, toolUseContext)
if (isValidCall?.result === false) {
return [{
message: createUserMessage({
content: [{
type: 'tool_result',
content: `<tool_use_error>${isValidCall.message}</tool_use_error>`,
is_error: true,
tool_use_id: toolUseID,
}],
}),
}]
}validateInput() 是可选的工具级校验,用于检查 Zod 无法覆盖的语义约束。例如,文件编辑工具可以在这里验证目标文件路径是否在允许的工作目录内。
第二阶段:Pre-Hooks 执行
输入校验通过后,进入 Pre-Hook 阶段。这是用户自定义扩展的第一个注入点:
typescript
// 源码位置: src/services/tools/toolExecution.ts
let hookPermissionResult: PermissionResult | undefined
for await (const result of runPreToolUseHooks(
toolUseContext, tool, processedInput, toolUseID,
assistantMessage.message.id, requestId, mcpServerType, mcpServerBaseUrl,
)) {
switch (result.type) {
case 'message':
// 收集 Hook 产出的消息(进度、附件等)
break
case 'hookPermissionResult':
hookPermissionResult = result.hookPermissionResult
break
case 'hookUpdatedInput':
processedInput = result.updatedInput
break
case 'preventContinuation':
shouldPreventContinuation = result.shouldPreventContinuation
break
case 'stopReason':
stopReason = result.stopReason
break
case 'stop':
// Hook 要求停止执行,返回错误消息
return resultingMessages
}
}