查询引擎
Claude Code 怎么样 QueryEngine 协调每个对话轮次——从第一条用户消息到流令牌、工具调用、重试、自动压缩和停止挂钩——然后再循环返回更多内容。
1. 大局观
当您向 Claude Code 发送消息时,它至少会通过 四个不同的层 在模型的回复到达您的终端之前。 了解这些层是理解 Claude Code 为什么会这样做的关键 - 为什么它会重试、为什么会压缩、为什么它可以在流中间运行工具。
-
QueryEngine.submitMessage() 验证提示,构建系统提示,解析模型,记录文字记录,然后移交给
query(). -
查询()→查询循环() An
async function*循环直到模型停止调用工具。 每次迭代都是一次模型调用。 -
查询模型/调用模型 通过 SDK 的流接口调用 Anthropic API,将所有内容包装在
withRetry(). -
停止挂钩和代币预算 模型完成每一轮后,外部钩子运行; 代币预算决定是否再次注入微移和循环。
query() 发电机。 生成器是轮流如何工作的唯一事实来源。
2. 时序图
以下是至少涉及一次工具调用的单个对话回合的完整消息流。 跟随箭头:之间的循环 queryLoop and queryModel 是主体行为的核心。
submitMessage() participant Q as query() /
queryLoop() participant QM as queryModel
(claude.ts) participant API as Anthropic API
(streaming) participant Tools as Tool
Executor participant SH as stopHooks.ts User->>QE: submitMessage(prompt) QE->>QE: fetchSystemPromptParts()
buildSystemInitMessage() QE->>Q: query({ messages, systemPrompt, ... }) Q->>Q: applyToolResultBudget()
microcompact / snip / autocompact loop queryLoop — one iteration per model call Q->>QM: callModel({ messages, tools, ... }) QM->>API: POST /v1/messages (streaming, withRetry) API-->>QM: stream: content_block_delta events QM-->>Q: yield AssistantMessage (text / tool_use blocks) Q->>Q: StreamingToolExecutor tracks tool_use blocks alt tool_use blocks present Q->>Tools: runTools(toolUseBlocks) Tools-->>Q: yield progress + tool_result UserMessages Q->>Q: append tool_results to messages Note over Q: needsFollowUp = true → loop continues else no tool calls Note over Q: needsFollowUp = false Q->>SH: handleStopHooks() SH-->>Q: yield hook progress/attachments alt hook blocking error Q->>Q: append blockingError, loop again else hook prevents continuation Q-->>QE: Terminal { reason: 'stop_hook_prevented' } else clean stop Q->>Q: checkTokenBudget() alt budget says continue Q->>Q: inject nudge message, loop again else budget says stop Q-->>QE: Terminal { reason: 'completed' } end end end end QE-->>User: yield SDKMessage stream
(assistant / user / result)
while (true) 在第 307 课 行 query.ts。 该循环的每次迭代都是一次 API 调用。
3. QueryEngine——每个对话一个引擎
QueryEngine 是一个 有状态类 每个对话实例化一次。 它保存可变消息历史记录、令牌使用总量、权限拒绝和中止控制器。 每次致电
submitMessage() 是该对话中的一个“回合”。
QueryEngine.ts (simplified)export class QueryEngine { private mutableMessages: Message[] private abortController: AbortController private totalUsage: NonNullableUsage private permissionDenials: SDKPermissionDenial[] // Turn-scoped: cleared at start of each submitMessage() call private discoveredSkillNames = new Set<string>() async *submitMessage( prompt: string | ContentBlockParam[], options?: { uuid?: string; isMeta?: boolean }, ): AsyncGenerator<SDKMessage> { // 1. Build system prompt (fetchSystemPromptParts) // 2. processUserInput — handles slash commands // 3. recordTranscript — persists BEFORE the API call // 4. yield* query({ messages, ... }) // 5. yield final result SDKMessage } }
为什么脚本要在 API 调用之前写入
Before query() 甚至被称为 submitMessage() 将用户的消息保存到磁盘。 这意味着会话是 resumable 即使该进程在模型响应之前被终止。 来源中的评论很有启发性:
// If the process is killed before that (e.g. user clicks Stop in
// cowork seconds after send), the transcript is left with only
// queue-operation entries; getLastSessionLog filters those out,
// returns null, and --resume fails with "No conversation found".
// Writing now makes the transcript resumable from the point the
// user message was accepted, even if no API response ever arrives.
QueryEngine 由 SDK/headless 路径使用。 REPL 有自己的接线 ask() 但调用相同 query() 下面的功能。
4. queryLoop() 内部——While(true) 核心
queryLoop() in query.ts 是一个 while(true)
带有类型的循环 State 迭代之间的对象。 而不是九个分开 let 变量,单个
state = { ... } 每次重新分配 继续网站 使转换变得明确且可审计。
query.tstype State = { messages: Message[] toolUseContext: ToolUseContext autoCompactTracking: AutoCompactTrackingState | undefined maxOutputTokensRecoveryCount: number hasAttemptedReactiveCompact: boolean maxOutputTokensOverride: number | undefined pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined stopHookActive: boolean | undefined turnCount: number transition: Continue | undefined // WHY we looped again }
继续过渡——循环的七个原因
The transition 现场记录 why 循环继续。 这使得继续站点能够自我记录,并让测试断言触发了哪个恢复路径,而无需检查消息内容:
| transition.reason | Meaning |
|---|---|
max_output_tokens_escalate | 首创8k上限; 以 64k max_tokens 重试 |
max_output_tokens_recovery | 模型达到输出极限; 注入恢复微移(最多 3 倍) |
reactive_compact_retry | 提示-太长→压缩历史→重试 |
collapse_drain_retry | 提示太长→耗尽上下文崩溃阶段→重试 |
stop_hook_blocking | 停止钩子返回阻塞错误; 重新查询,错误为用户消息 |
token_budget_continuation | 代币预算表明工作尚未完成; 注入微移并继续 |
| (需要后续跟进) | 正常:模型返回tool_use块→运行工具→循环 |
Terminal) 在: completed,
blocking_limit, model_error, prompt_too_long,
aborted_streaming, stop_hook_prevented,
image_error。 每个映射到不同的用户可见结果。
5. 流媒体和 API 层
queryModel in claude.ts 是一个 async function*
调用 Anthropic beta 消息端点并将每个流事件重新生成为内部 AssistantMessage or StreamEvent.
query.ts (inner stream loop, simplified)for await (const message of deps.callModel({ messages: prependUserContext(messagesForQuery, userContext), systemPrompt: fullSystemPrompt, thinkingConfig: toolUseContext.options.thinkingConfig, tools: toolUseContext.options.tools, signal: toolUseContext.abortController.signal, options: { model: currentModel, fallbackModel, ... }, })) { if (message.type === 'assistant') { assistantMessages.push(message) // tool_use blocks trigger needsFollowUp = true const toolBlocks = message.message.content .filter(b => b.type === 'tool_use') if (toolBlocks.length > 0) needsFollowUp = true } yield yieldMessage // surfaces to SDK caller / REPL }
流媒体工具执行
When config.gates.streamingToolExecution 已启用,一个
StreamingToolExecutor 消防工具 当流仍然打开时。 输入较早到达的工具开始与仍然生成文本的模型并行执行,从而减少多工具轮流的延迟。
AssistantMessage 对象是 tombstoned — 发动机产量
{ type: 'tombstone', message } 因此 UI 和脚本可以删除它们。 这可以防止重试时出现“思维块无法修改”API 错误。
深入探讨:withRetry() — 指数退避、529 秒和 OAuth 刷新
每个 API 调用都会经过 withRetry() in
services/api/withRetry.ts。 该函数是一个
async function* 重试最多
DEFAULT_MAX_RETRIES = 10 默认情况下,产生一个
SystemAPIErrorMessage 每次睡眠前,用户都会看到实时状态更新。
// Backoff formula (from withRetry.ts) export function getRetryDelay( attempt: number, retryAfterHeader?: string | null, maxDelayMs = 32000, ): number { if (retryAfterHeader) { const seconds = parseInt(retryAfterHeader, 10) if (!isNaN(seconds)) return seconds * 1000 } const baseDelay = Math.min( BASE_DELAY_MS * Math.pow(2, attempt - 1), maxDelayMs, ) const jitter = Math.random() * 0.25 * baseDelay return baseDelay + jitter }
关键重试决策规则:
- 529(超载): 仅前台查询源重试(用户正在等待)。 背景来源——摘要、分类——立即放弃,以避免放大容量级联。
- 作品后备: 在非自定义 Opus 模型上连续 3 个 529 后,抛出
FallbackTriggeredErrorwhichqueryLoop捕获并切换到fallbackModel. - OAuth 401: 通过强制刷新令牌
handleOAuth401Error()在下一次尝试之前。 - 上下文溢出 400: 从错误消息中解析令牌计数并计算新的
maxTokensOverride. - 持久模式(UNATTENDED_RETRY): 以 30 分钟的退避上限无限期地重试,每 30 秒生成一次心跳消息,以便主机不会因不活动而终止会话。
- ECONNRESET/EPIPE: 检测到陈旧的保持活动套接字;
disableKeepAlive()在重试之前调用。
深入探讨:SSE 流 → AssistantMessage 重构
Anthropic 流 API 按以下顺序发送服务器发送的事件:
message_start → 一个或多个 content_block_start /
content_block_delta / content_block_stop 成对 →
message_delta (最终使用+ stop_reason)→
message_stop.
queryModel 重建一个完整的 AssistantMessage
每个内容块对象并产生它。 用法在最后一条消息上就地改变一次 message_delta 到达 — 在流结束之前,最终的 stop_reason 和令牌计数不可用。
// From QueryEngine.ts — usage tracking if (message.event.type === 'message_start') { currentMessageUsage = updateUsage(EMPTY_USAGE, message.event.message.usage) } if (message.event.type === 'message_delta') { currentMessageUsage = updateUsage(currentMessageUsage, message.event.usage) if (message.event.delta.stop_reason != null) { lastStopReason = message.event.delta.stop_reason } }
一个微妙之处: tool_use 块包含它们的 JSON
input 通过三角洲。 如果一个工具的
backfillObservableInput 方法将字段添加到输入(例如,扩展文件路径),仅 clone 消息的一部分被提供给观察者——原始消息在提示缓存中保持逐字节相同。
6. 上下文管理和 Autocompact
在每次 API 调用之前, queryLoop 以固定优先级顺序运行上下文缩减策略的管道:
-
applyToolResultBudget() 限制单个工具结果的字节大小。 大型结果存储在外部并用参考存根替换。
-
snipCompact(HISTORY_SNIP 功能) 当证明不需要旧消息时,从历史记录中删除旧消息,从而在没有完整摘要传递的情况下释放令牌。
-
微型/缓存微型 将连续的工具结果/用户消息对合并为简洁的摘要。 缓存变体使用 API 端缓存编辑来避免重新传输已删除的块。
-
contextCollapse(CONTEXT_COLLAPSE 功能) 对 REPL 完整历史记录的读取时间投影。 每个条目都会发生分阶段崩溃; 模型会看到折叠视图,而 UI 保留了回滚的完整历史记录。
-
autoCompact 当上下文接近阻塞限制时,通过分叉代理触发完整摘要。 如果它触发,则循环立即继续处理后压缩消息。
深入探讨:autocompact — 阈值、断路器和 task_budget
发生阻塞限制检查 after 所有压缩策略均已运行。 如果上下文仍然超过限制,则合成
PROMPT_TOO_LONG_ERROR_MESSAGE 产生并且循环有理由退出 blocking_limit — 用户必须手动运行
/compact.
反应式紧凑是由 API 中的真实 413(提示太长)触发的后备路径。 引擎在流式传输期间保留错误消息,然后尝试一次反应性压缩。 如果失败,则会出现错误并跳过停止钩子(以防止死亡螺旋)。
The task_budget 功能跟踪跨紧凑边界消耗的总上下文令牌。 当服务器总结历史时,它通常会低估预压缩的支出; taskBudgetRemaining
携带正确的跨界累计支出。
// task_budget carryover across compaction (query.ts ~508) if (params.taskBudget) { const preCompactContext = finalContextTokensFromLastResponse(messagesForQuery) taskBudgetRemaining = Math.max( 0, (taskBudgetRemaining ?? params.taskBudget.total) - preCompactContext, ) }
7. 停止 Hooks——回合后生命周期
模型完成后(无需工具调用,无需恢复),引擎调用
handleStopHooks() in query/stopHooks.ts。 停止挂钩是外部 shell 脚本或用户配置的命令。 他们每次转弯后都会奔跑,并且可以:
- 产生阻塞错误 - 作为用户消息注入,触发另一个循环迭代
- 防止继续 — 引擎返回
{ reason: 'stop_hook_prevented' } - 火后台任务 — 提示建议、记忆提取、自动梦想 — 所有这些都是一劳永逸
深入探讨:Stop hooks、TeammateIdle、TaskCompleted 和“即发即忘”副作用
handleStopHooks() 按顺序运行三类钩子:
1. 停止 Hooks(总是)
注册通过 settings.json 挂钩配置。 并行运行; 每个结果都被收集为 hook_success,
hook_non_blocking_error, 或者 hook_error_during_execution
依恋。 阻塞错误是任何钩子退出代码失败,其中钩子明确指示它应该阻塞。
2.TaskCompleted 钩子(仅限队友模式)
在队友模式(多代理设置)下,每个代理都会触发钩子
in_progress 该代理拥有的任务。 这些镜像停止钩子语义(可以阻塞,可以阻止继续)。
3. TeammateIdle 钩子(仅限队友模式)
当该队友进入空闲状态时触发。 也可以阻止或阻止继续。
4. 一劳永逸的后台任务
在裸模式下跳过(-p 旗帜)。 被解雇时没有 await
在交互模式下:
executePromptSuggestion— 生成 btw... suggestionsexecuteExtractMemories— 将事实提取到 MEMORY.mdexecuteAutoDream— 自主背景探索
// --bare / SIMPLE: skip background bookkeeping // Scripted -p calls don't want auto-memory or forked agents // contending for resources during shutdown. if (!isBareMode()) { void executePromptSuggestion(stopHookContext) if (feature('EXTRACT_MEMORIES') && isExtractModeActive()) { void extractMemoriesModule!.executeExtractMemories(...) } if (!toolUseContext.agentId) { void executeAutoDream(...) } }
8. 代币预算——自动继续功能
query/tokenBudget.ts 为 SDK 路径实现自动继续功能。 配置每回合令牌预算后,引擎会在每个干净模型停止后检查模型是否“用完”足够的预算。 如果没有,它会注入一条微移消息并再次循环。
深入探讨:预算跟踪器、阈值和收益递减检测
query/tokenBudget.tsconst COMPLETION_THRESHOLD = 0.9 // 90% used = done const DIMINISHING_THRESHOLD = 500 // <500 new tokens = no progress export function checkTokenBudget( tracker: BudgetTracker, agentId: string | undefined, budget: number | null, globalTurnTokens: number, ): TokenBudgetDecision { if (agentId || budget === null || budget <= 0) { return { action: 'stop', completionEvent: null } } const pct = Math.round((globalTurnTokens / budget) * 100) const isDiminishing = tracker.continuationCount >= 3 && deltaSinceLastCheck < DIMINISHING_THRESHOLD && tracker.lastDeltaTokens < DIMINISHING_THRESHOLD // Continue if under 90% AND not diminishing if (!isDiminishing && turnTokens < budget * COMPLETION_THRESHOLD) { return { action: 'continue', nudgeMessage: ... } } return { action: 'stop', ... } }
决策逻辑有两个提前停止条件:
- 预算用尽: 转代币≥预算的90%→停止
- 收益递减: 3+ 延续后,如果当前 Delta 和前一个 Delta 均低于 500 个代币 → 停止(模型正在旋转)
微移消息作为 isMeta 用户消息,因此它不会出现在 REPL 记录中,并且循环继续
transition.reason = 'token_budget_continuation'.
9. 要点
一个循环,多种退出原因
The while(true) in queryLoop 通过键入退出
Terminal 价值。 每个可能的停止条件——完成、错误、中止、停止挂钩、预算——都有一个指定的原因。
发电机一路向下
submitMessage, query, queryLoop,
queryModel, withRetry, handleStopHooks
— 全部都是 async function*。 这让整个堆栈干净地组合在一起 yield* 背压自然流动。
成绩单优先的可靠性
用户消息在调用 API 之前写入磁盘。 即使发送和响应之间的进程终止也会留下可恢复的会话。
功能门控死代码消除
feature('HISTORY_SNIP'), feature('TOKEN_BUDGET'),
feature('CONTEXT_COLLAPSE') 等在捆绑时由 Bun 进行评估,消除外部构建中无法访问的代码并防止字符串泄漏。
背景效果一劳永逸
记忆提取、提示建议、自动梦——全都有 void
承诺。 它们不得阻塞响应流,也不得裸露运行(-p) 模式,其中关闭时的资源争用很重要。
重试比指数退避更聪明
前台与后台源路由、快速模式冷却、OAuth 刷新、持久保持活动、Opus→3×529 后回退、上下文溢出令牌重新计算 — 重试层是一个小型状态机,而不仅仅是一个睡眠循环。
10. 测验
五个问题来检查您的理解情况。 选择一个答案,然后点击 Check.