Claude Code 源码分析第 04 课 · 第 01
第 04 课

查询引擎

Claude Code 怎么样 QueryEngine 协调每个对话轮次——从第一条用户消息到流令牌、工具调用、重试、自动压缩和停止挂钩——然后再循环返回更多内容。

1. 大局观

当您向 Claude Code 发送消息时,它至少会通过 四个不同的层 在模型的回复到达您的终端之前。 了解这些层是理解 Claude Code 为什么会这样做的关键 - 为什么它会重试、为什么会压缩、为什么它可以在流中间运行工具。

  1. QueryEngine.submitMessage() 验证提示,构建系统提示,解析模型,记录文字记录,然后移交给 query().
  2. 查询()→查询循环() An async function* 循环直到模型停止调用工具。 每次迭代都是一次模型调用。
  3. 查询模型/调用模型 通过 SDK 的流接口调用 Anthropic API,将所有内容包装在 withRetry().
  4. 停止挂钩和代币预算 模型完成每一轮后,外部钩子运行; 代币预算决定是否再次注入微移和循环。
关键见解 Claude Code 的每个公共表面 — REPL、SDK、远程 Claude Code — 通过相同的漏斗 query() 发电机。 生成器是轮流如何工作的唯一事实来源。

2. 时序图

以下是至少涉及一次工具调用的单个对话回合的完整消息流。 跟随箭头:之间的循环 queryLoop and queryModel 是主体行为的核心。

sequenceDiagram participant User participant QE as QueryEngine
submitMessage() participant Q as query() /
queryLoop() participant QM as queryModel
(claude.ts) participant API as Anthropic API
(streaming) participant Tools as Tool
Executor participant SH as stopHooks.ts User->>QE: submitMessage(prompt) QE->>QE: fetchSystemPromptParts()
buildSystemInitMessage() QE->>Q: query({ messages, systemPrompt, ... }) Q->>Q: applyToolResultBudget()
microcompact / snip / autocompact loop queryLoop — one iteration per model call Q->>QM: callModel({ messages, tools, ... }) QM->>API: POST /v1/messages (streaming, withRetry) API-->>QM: stream: content_block_delta events QM-->>Q: yield AssistantMessage (text / tool_use blocks) Q->>Q: StreamingToolExecutor tracks tool_use blocks alt tool_use blocks present Q->>Tools: runTools(toolUseBlocks) Tools-->>Q: yield progress + tool_result UserMessages Q->>Q: append tool_results to messages Note over Q: needsFollowUp = true → loop continues else no tool calls Note over Q: needsFollowUp = false Q->>SH: handleStopHooks() SH-->>Q: yield hook progress/attachments alt hook blocking error Q->>Q: append blockingError, loop again else hook prevents continuation Q-->>QE: Terminal { reason: 'stop_hook_prevented' } else clean stop Q->>Q: checkTokenBudget() alt budget says continue Q->>Q: inject nudge message, loop again else budget says stop Q-->>QE: Terminal { reason: 'completed' } end end end end QE-->>User: yield SDKMessage stream
(assistant / user / result)
读图 The loop 盒子不仅仅是一个图表惯例——它直接映射到 while (true) 在第 307 课 行 query.ts。 该循环的每次迭代都是一次 API 调用。

3. QueryEngine——每个对话一个引擎

QueryEngine 是一个 有状态类 每个对话实例化一次。 它保存可变消息历史记录、令牌使用总量、权限拒绝和中止控制器。 每次致电 submitMessage() 是该对话中的一个“回合”。

QueryEngine.ts (simplified)export class QueryEngine {
  private mutableMessages: Message[]
  private abortController: AbortController
  private totalUsage: NonNullableUsage
  private permissionDenials: SDKPermissionDenial[]

  // Turn-scoped: cleared at start of each submitMessage() call
  private discoveredSkillNames = new Set<string>()

  async *submitMessage(
    prompt: string | ContentBlockParam[],
    options?: { uuid?: string; isMeta?: boolean },
  ): AsyncGenerator<SDKMessage> {
    // 1. Build system prompt (fetchSystemPromptParts)
    // 2. processUserInput — handles slash commands
    // 3. recordTranscript — persists BEFORE the API call
    // 4. yield* query({ messages, ... })
    // 5. yield final result SDKMessage
  }
}

为什么脚本要在 API 调用之前写入

Before query() 甚至被称为 submitMessage() 将用户的消息保存到磁盘。 这意味着会话是 resumable 即使该进程在模型响应之前被终止。 来源中的评论很有启发性:

// If the process is killed before that (e.g. user clicks Stop in
// cowork seconds after send), the transcript is left with only
// queue-operation entries; getLastSessionLog filters those out,
// returns null, and --resume fails with "No conversation found".
// Writing now makes the transcript resumable from the point the
// user message was accepted, even if no API response ever arrives.
SDK 与 REPL QueryEngine 由 SDK/headless 路径使用。 REPL 有自己的接线 ask() 但调用相同 query() 下面的功能。

4. queryLoop() 内部——While(true) 核心

queryLoop() in query.ts 是一个 while(true) 带有类型的循环 State 迭代之间的对象。 而不是九个分开 let 变量,单个 state = { ... } 每次重新分配 继续网站 使转换变得明确且可审计。

query.tstype State = {
  messages: Message[]
  toolUseContext: ToolUseContext
  autoCompactTracking: AutoCompactTrackingState | undefined
  maxOutputTokensRecoveryCount: number
  hasAttemptedReactiveCompact: boolean
  maxOutputTokensOverride: number | undefined
  pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined
  stopHookActive: boolean | undefined
  turnCount: number
  transition: Continue | undefined   // WHY we looped again
}

继续过渡——循环的七个原因

The transition 现场记录 why 循环继续。 这使得继续站点能够自我记录,并让测试断言触发了哪个恢复路径,而无需检查消息内容:

transition.reasonMeaning
max_output_tokens_escalate首创8k上限; 以 64k max_tokens 重试
max_output_tokens_recovery模型达到输出极限; 注入恢复微移(最多 3 倍)
reactive_compact_retry提示-太长→压缩历史→重试
collapse_drain_retry提示太长→耗尽上下文崩溃阶段→重试
stop_hook_blocking停止钩子返回阻塞错误; 重新查询,错误为用户消息
token_budget_continuation代币预算表明工作尚未完成; 注入微移并继续
(需要后续跟进)正常:模型返回tool_use块→运行工具→循环
终止条件 循环退出(返回 Terminal) 在: completed, blocking_limit, model_error, prompt_too_long, aborted_streaming, stop_hook_prevented, image_error。 每个映射到不同的用户可见结果。

5. 流媒体和 API 层

queryModel in claude.ts 是一个 async function* 调用 Anthropic beta 消息端点并将每个流事件重新生成为内部 AssistantMessage or StreamEvent.

query.ts (inner stream loop, simplified)for await (const message of deps.callModel({
  messages: prependUserContext(messagesForQuery, userContext),
  systemPrompt: fullSystemPrompt,
  thinkingConfig: toolUseContext.options.thinkingConfig,
  tools: toolUseContext.options.tools,
  signal: toolUseContext.abortController.signal,
  options: { model: currentModel, fallbackModel, ... },
})) {
  if (message.type === 'assistant') {
    assistantMessages.push(message)
    // tool_use blocks trigger needsFollowUp = true
    const toolBlocks = message.message.content
      .filter(b => b.type === 'tool_use')
    if (toolBlocks.length > 0) needsFollowUp = true
  }
  yield yieldMessage // surfaces to SDK caller / REPL
}

流媒体工具执行

When config.gates.streamingToolExecution 已启用,一个 StreamingToolExecutor 消防工具 当流仍然打开时。 输入较早到达的工具开始与仍然生成文本的模型并行执行,从而减少多工具轮流的延迟。

墓碑消息 如果流回退在中流触发,部分接收 AssistantMessage 对象是 tombstoned — 发动机产量 { type: 'tombstone', message } 因此 UI 和脚本可以删除它们。 这可以防止重试时出现“思维块无法修改”API 错误。
深入探讨:withRetry() — 指数退避、529 秒和 OAuth 刷新

每个 API 调用都会经过 withRetry() in services/api/withRetry.ts。 该函数是一个 async function* 重试最多 DEFAULT_MAX_RETRIES = 10 默认情况下,产生一个 SystemAPIErrorMessage 每次睡眠前,用户都会看到实时状态更新。

// Backoff formula (from withRetry.ts)
export function getRetryDelay(
  attempt: number,
  retryAfterHeader?: string | null,
  maxDelayMs = 32000,
): number {
  if (retryAfterHeader) {
    const seconds = parseInt(retryAfterHeader, 10)
    if (!isNaN(seconds)) return seconds * 1000
  }
  const baseDelay = Math.min(
    BASE_DELAY_MS * Math.pow(2, attempt - 1),
    maxDelayMs,
  )
  const jitter = Math.random() * 0.25 * baseDelay
  return baseDelay + jitter
}

关键重试决策规则:

  • 529(超载): 仅前台查询源重试(用户正在等待)。 背景来源——摘要、分类——立即放弃,以避免放大容量级联。
  • 作品后备: 在非自定义 Opus 模型上连续 3 个 529 后,抛出 FallbackTriggeredError which queryLoop 捕获并切换到 fallbackModel.
  • OAuth 401: 通过强制刷新令牌 handleOAuth401Error() 在下一次尝试之前。
  • 上下文溢出 400: 从错误消息中解析令牌计数并计算新的 maxTokensOverride.
  • 持久模式(UNATTENDED_RETRY): 以 30 分钟的退避上限无限期地重试,每 30 秒生成一次心跳消息,以便主机不会因不活动而终止会话。
  • ECONNRESET/EPIPE: 检测到陈旧的保持活动套接字; disableKeepAlive() 在重试之前调用。
深入探讨:SSE 流 → AssistantMessage 重构

Anthropic 流 API 按以下顺序发送服务器发送的事件: message_start → 一个或多个 content_block_start / content_block_delta / content_block_stop 成对 → message_delta (最终使用+ stop_reason)→ message_stop.

queryModel 重建一个完整的 AssistantMessage 每个内容块对象并产生它。 用法在最后一条消息上就地改变一次 message_delta 到达 — 在流结束之前,最终的 stop_reason 和令牌计数不可用。

// From QueryEngine.ts — usage tracking
if (message.event.type === 'message_start') {
  currentMessageUsage = updateUsage(EMPTY_USAGE, message.event.message.usage)
}
if (message.event.type === 'message_delta') {
  currentMessageUsage = updateUsage(currentMessageUsage, message.event.usage)
  if (message.event.delta.stop_reason != null) {
    lastStopReason = message.event.delta.stop_reason
  }
}

一个微妙之处: tool_use 块包含它们的 JSON input 通过三角洲。 如果一个工具的 backfillObservableInput 方法将字段添加到输入(例如,扩展文件路径),仅 clone 消息的一部分被提供给观察者——原始消息在提示缓存中保持逐字节相同。

6. 上下文管理和 Autocompact

在每次 API 调用之前, queryLoop 以固定优先级顺序运行上下文缩减策略的管道:

  1. applyToolResultBudget() 限制单个工具结果的字节大小。 大型结果存储在外部并用参考存根替换。
  2. snipCompact(HISTORY_SNIP 功能) 当证明不需要旧消息时,从历史记录中删除旧消息,从而在没有完整摘要传递的情况下释放令牌。
  3. 微型/缓存微型 将连续的工具结果/用户消息对合并为简洁的摘要。 缓存变体使用 API 端缓存编辑来避免重新传输已删除的块。
  4. contextCollapse(CONTEXT_COLLAPSE 功能) 对 REPL 完整历史记录的读取时间投影。 每个条目都会发生分阶段崩溃; 模型会看到折叠视图,而 UI 保留了回滚的完整历史记录。
  5. autoCompact 当上下文接近阻塞限制时,通过分叉代理触发完整摘要。 如果它触发,则循环立即继续处理后压缩消息。
深入探讨:autocompact — 阈值、断路器和 task_budget

发生阻塞限制检查 after 所有压缩策略均已运行。 如果上下文仍然超过限制,则合成 PROMPT_TOO_LONG_ERROR_MESSAGE 产生并且循环有理由退出 blocking_limit — 用户必须手动运行 /compact.

反应式紧凑是由 API 中的真实 413(提示太长)触发的后备路径。 引擎在流式传输期间保留错误消息,然后尝试一次反应性压缩。 如果失败,则会出现错误并跳过停止钩子(以防止死亡螺旋)。

The task_budget 功能跟踪跨紧凑边界消耗的总上下文令牌。 当服务器总结历史时,它通常会低估预压缩的支出; taskBudgetRemaining 携带正确的跨界累计支出。

// task_budget carryover across compaction (query.ts ~508)
if (params.taskBudget) {
  const preCompactContext =
    finalContextTokensFromLastResponse(messagesForQuery)
  taskBudgetRemaining = Math.max(
    0,
    (taskBudgetRemaining ?? params.taskBudget.total) - preCompactContext,
  )
}

7. 停止 Hooks——回合后生命周期

模型完成后(无需工具调用,无需恢复),引擎调用 handleStopHooks() in query/stopHooks.ts。 停止挂钩是外部 shell 脚本或用户配置的命令。 他们每次转弯后都会奔跑,并且可以:

  • 产生阻塞错误 - 作为用户消息注入,触发另一个循环迭代
  • 防止继续 — 引擎返回 { reason: 'stop_hook_prevented' }
  • 火后台任务 — 提示建议、记忆提取、自动梦想 — 所有这些都是一劳永逸
深入探讨:Stop hooks、TeammateIdle、TaskCompleted 和“即发即忘”副作用

handleStopHooks() 按顺序运行三类钩子:

1. 停止 Hooks(总是)

注册通过 settings.json 挂钩配置。 并行运行; 每个结果都被收集为 hook_success, hook_non_blocking_error, 或者 hook_error_during_execution 依恋。 阻塞错误是任何钩子退出代码失败,其中钩子明确指示它应该阻塞。

2.TaskCompleted 钩子(仅限队友模式)

在队友模式(多代理设置)下,每个代理都会触发钩子 in_progress 该代理拥有的任务。 这些镜像停止钩子语义(可以阻塞,可以阻止继续)。

3. TeammateIdle 钩子(仅限队友模式)

当该队友进入空闲状态时触发。 也可以阻止或阻止继​​续。

4. 一劳永逸的后台任务

在裸模式下跳过(-p 旗帜)。 被解雇时没有 await 在交互模式下:

  • executePromptSuggestion — 生成 btw... suggestions
  • executeExtractMemories — 将事实提取到 MEMORY.md
  • executeAutoDream — 自主背景探索
// --bare / SIMPLE: skip background bookkeeping
// Scripted -p calls don't want auto-memory or forked agents
// contending for resources during shutdown.
if (!isBareMode()) {
  void executePromptSuggestion(stopHookContext)
  if (feature('EXTRACT_MEMORIES') && isExtractModeActive()) {
    void extractMemoriesModule!.executeExtractMemories(...)
  }
  if (!toolUseContext.agentId) {
    void executeAutoDream(...)
  }
}

8. 代币预算——自动继续功能

query/tokenBudget.ts 为 SDK 路径实现自动继续功能。 配置每回合令牌预算后,引擎会在每个干净模型停止后检查模型是否“用完”足够的预算。 如果没有,它会注入一条微移消息并再次循环。

深入探讨:预算跟踪器、阈值和收益递减检测
query/tokenBudget.tsconst COMPLETION_THRESHOLD = 0.9   // 90% used = done
const DIMINISHING_THRESHOLD = 500  // <500 new tokens = no progress

export function checkTokenBudget(
  tracker: BudgetTracker,
  agentId: string | undefined,
  budget: number | null,
  globalTurnTokens: number,
): TokenBudgetDecision {
  if (agentId || budget === null || budget <= 0) {
    return { action: 'stop', completionEvent: null }
  }
  const pct = Math.round((globalTurnTokens / budget) * 100)
  const isDiminishing =
    tracker.continuationCount >= 3 &&
    deltaSinceLastCheck < DIMINISHING_THRESHOLD &&
    tracker.lastDeltaTokens < DIMINISHING_THRESHOLD
  // Continue if under 90% AND not diminishing
  if (!isDiminishing && turnTokens < budget * COMPLETION_THRESHOLD) {
    return { action: 'continue', nudgeMessage: ... }
  }
  return { action: 'stop', ... }
}

决策逻辑有两个提前停止条件:

  • 预算用尽: 转代币≥预算的90%→停止
  • 收益递减: 3+ 延续后,如果当前 Delta 和前一个 Delta 均低于 500 个代币 → 停止(模型正在旋转)

微移消息作为 isMeta 用户消息,因此它不会出现在 REPL 记录中,并且循环继续 transition.reason = 'token_budget_continuation'.

9. 要点

一个循环,多种退出原因

The while(true) in queryLoop 通过键入退出 Terminal 价值。 每个可能的停止条件——完成、错误、中止、停止挂钩、预算——都有一个指定的原因。

发电机一路向下

submitMessage, query, queryLoop, queryModel, withRetry, handleStopHooks — 全部都是 async function*。 这让整个堆栈干净地组合在一起 yield* 背压自然流动。

成绩单优先的可靠性

用户消息在调用 API 之前写入磁盘。 即使发送和响应之间的进程终止也会留下可恢复的会话。

功能门控死代码消除

feature('HISTORY_SNIP'), feature('TOKEN_BUDGET'), feature('CONTEXT_COLLAPSE') 等在捆绑时由 Bun 进行评估,消除外部构建中无法访问的代码并防止字符串泄漏。

背景效果一劳永逸

记忆提取、提示建议、自动梦——全都有 void 承诺。 它们不得阻塞响应流,也不得裸露运行(-p) 模式,其中关闭时的资源争用很重要。

重试比指数退避更聪明

前台与后台源路由、快速模式冷却、OAuth 刷新、持久保持活动、Opus→3×529 后回退、上下文溢出令牌重新计算 — 重试层是一个小型状态机,而不仅仅是一个睡眠循环。

10. 测验

五个问题来检查您的理解情况。 选择一个答案,然后点击 Check.

1主要原因是什么 QueryEngine.submitMessage() 将转录内容写入磁盘 before calling query()?

2在非自定义 Opus 模型上连续 3 次出现 529(过载)错误后, withRetry() 抛出哪个错误?

3什么是 transition.reason 场上的 State 对象代表?

4在代币预算功能中,什么时候 checkTokenBudget() 触发一个 收益递减 早点停?

5当最后一条助理消息是 API 错误(例如,速率限制或提示太长)时,为什么停止挂钩逻辑会跳过正在运行的挂钩?

Claude Code 课程 — 第 04 课 课 — 查询引擎和 LLM API