Deep Dive | QueryEngine.ts 1295 行 class 状态机拆解¶
重要性:⭐⭐⭐⭐(理解 Claude Code 状态机的最佳范本) 真实位置:
src/QueryEngine.ts(1295 行) 核心组成: - 行 1-128:imports(40+ 业务模块) - 行 130-183:QueryEngineConfig 类型(54 行) - 行 184-1185:class QueryEngine(1001 行) - 行 1186-1295:顶层ask()函数(110 行)
1. 文件结构总览¶
QueryEngine.ts (1295 行)
│
├── 行 1-128 :imports(40+ 业务模块)
├── 行 130-183:QueryEngineConfig 类型(54 行的"全功能配置")
├── 行 184 :class 注释(说明设计意图)
├── 行 200 :constructor(config)
├── 行 209 :async *submitMessage(...) — 每轮对话入口
├── 行 1186 :顶层 ask() 函数(兼容旧 API)
└── 行 1295 :EOF
2. 行 1-128:Imports —— 40+ 业务模块¶
import { feature } from 'bun:bundle'
import type { ContentBlockParam } from '@anthropic-ai/sdk/resources/messages.mjs'
import { randomUUID } from 'crypto'
import last from 'lodash-es/last.js'
// 会话状态
import { getSessionId, isSessionPersistenceDisabled } from 'src/bootstrap/state.js'
// SDK 类型(Claude Code 作为库被嵌入时的接口)
import type { SDKCompactBoundaryMessage, SDKMessage, SDKStatus, ... } from 'src/entrypoints/agentSdkTypes.js'
// API 客户端
import { accumulateUsage, updateUsage } from 'src/services/api/claude.js'
import type { NonNullableUsage } from 'src/services/api/logging.js'
import { EMPTY_USAGE } from 'src/services/api/logging.js'
import stripAnsi from 'strip-ansi'
// 业务模块
import type { Command } from './commands.js'
import { getSlashCommandToolSkills } from './commands.js'
import { LOCAL_COMMAND_STDERR_TAG, LOCAL_COMMAND_STDOUT_TAG } from './constants/xml.js'
import { getModelUsage, getTotalAPIDuration, getTotalCost } from './cost-tracker.js'
import type { CanUseToolFn } from './hooks/useCanUseTool.js'
// 记忆
import { loadMemoryPrompt } from './memdir/memdir.js'
import { hasAutoMemPathOverride } from './memdir/paths.js'
// agent 循环
import { query } from './query.js'
import { categorizeRetryableAPIError } from './services/api/errors.js'
// MCP
import type { MCPServerConnection } from './services/mcp/types.js'
// State
import type { AppState } from './state/AppState.js'
// Tool 系统
import { type Tools, type ToolUseContext, toolMatchesName } from './Tool.js'
import type { AgentDefinition } from './tools/AgentTool/loadAgentsDir.js'
import { SYNTHETIC_OUTPUT_TOOL_NAME } from './tools/SyntheticOutputTool/SyntheticOutputTool.js'
// 消息
import type { Message } from './types/message.js'
import type { OrphanedPermission } from './types/textInputTypes.js'
// 工具函数
import { createAbortController } from './utils/abortController.js'
import type { AttributionState } from './utils/commitAttribution.js'
import { getGlobalConfig } from './utils/config.js'
import { getCwd } from './utils/cwd.js'
import { isBareMode, isEnvTruthy } from './utils/envUtils.js'
import { getFastModeState } from './utils/fastMode.js'
// ... 还有 20+ 业务模块
关键洞察:QueryEngine 是个 "控制中心" —— 它知道整个项目的所有业务模块,但只 import 类型 + 调函数。没有继承、没有依赖注入框架(直接接 config)。
3. 行 130-183:QueryEngineConfig 类型(54 行)¶
3.1 完整类型定义¶
export type QueryEngineConfig = {
// 必需:会话基础
cwd: string
tools: Tools
commands: Command[]
mcpClients: MCPServerConnection[]
agents: AgentDefinition[]
canUseTool: CanUseToolFn
// 必需:State 桥接
getAppState: () => AppState
setAppState: (f: (prev: AppState) => AppState) => void
// 可选:历史消息
initialMessages?: Message[]
// 可选:文件缓存
readFileCache: FileStateCache
// 可选:System prompt
customSystemPrompt?: string
appendSystemPrompt?: string
// 可选:模型选择
userSpecifiedModel?: string
fallbackModel?: string
thinkingConfig?: ThinkingConfig
// 可选:限制
maxTurns?: number
maxBudgetUsd?: number
taskBudget?: { total: number }
// 可选:结构化输出
jsonSchema?: Record<string, unknown>
// 可选:调试
verbose?: boolean
replayUserMessages?: boolean
// 可选:MCP elicitation
handleElicitation?: ToolUseContext['handleElicitation']
// 可选:流式输出
includePartialMessages?: boolean
setSDKStatus?: (status: SDKStatus) => void
// 可选:取消
abortController?: AbortController
// 可选:孤儿权限
orphanedPermission?: OrphanedPermission
// 可选:历史裁剪
snipReplay?: (
yieldedSystemMsg: Message,
store: Message[],
) => { messages: Message[]; executed: boolean } | undefined
}
3.2 关键设计¶
1. State 是依赖注入的
不是 直接 import store,而是接收 getter/setter。
好处:
- 测试时可注入 mock state
- 解耦:QueryEngine 不依赖具体 store 实现
- 可同时支持多个 store 实例(REPL、SDK 各自一个)
2. 25 个可选字段 + 7 个必需字段 = 32 个配置点
为什么这么多? - 每个特性都是 opt-in(默认不启用) - 兼容多种使用场景(REPL / SDK / 测试) - 不影响未启用的代码路径
3. snipReplay 字段揭示了高级特性
注释说:"Snip-boundary handler: receives each yielded system message plus the current mutableMessages store. Returns undefined if the message is not a snip boundary..."
"Snip" = 上下文裁剪。长会话时裁剪历史消息以节省 token。
仅 SDK 模式启用(REPL 因为 UI 需要滚动保留全历史,不裁剪)。
4. setSDKStatus 暴露生命周期事件
SDK 消费者可以监听 compacting / idle / running 等状态。
这是 SDK 的"事件回调" —— 让外部知道内部在做什么。
4. class QueryEngine 注释(行 184-199)¶
/**
* QueryEngine owns the query lifecycle and session state for a conversation.
* It extracts the core logic from ask() into a standalone class that can be
* used by both the headless/SDK path and (in a future phase) the REPL.
*
* One QueryEngine per conversation. Each submitMessage() call starts a new
* turn within the same conversation. State (messages, file cache, usage, etc.)
* persists across turns.
*/
3 个关键设计: 1. "一个对话 = 一个 QueryEngine" —— 生命周期模型 2. "submitMessage 启动新轮" —— 多轮对话模型 3. "State 在多轮之间持续" —— 持久化模型
前端类比:和数据库连接的"长连接"模型一样。连接 = 一段对话,query = 一轮请求。
5. 行 200-208:constructor¶
constructor(config: QueryEngineConfig) {
this.config = config
this.mutableMessages = config.initialMessages ?? []
this.abortController = config.abortController ?? createAbortController()
this.permissionDenials = []
this.readFileState = config.readFileCache
this.totalUsage = EMPTY_USAGE
}
5 行初始化 5 个状态:
| 字段 | 类型 | 初始值 | 含义 |
|---|---|---|---|
| config | QueryEngineConfig | config | 配置(注入)|
| mutableMessages | Message[] | [] 或 config.initialMessages | 消息历史(mutable)|
| abortController | AbortController | 新建 | 取消信号 |
| permissionDenials | SDKPermissionDenial[] | [] | 拒绝历史(避免重问)|
| readFileState | FileStateCache | config.readFileCache | 文件读取缓存 |
| totalUsage | NonNullableUsage | EMPTY_USAGE | token / 费用累计 |
关键洞察:mutableMessages —— 虽然是 mutable,但对外暴露为 read-only(DeepImmutable 类型)。
为什么用 mutable:
- 避免每次 setState 都 array spread(性能)
- 注释里多次提到 "avoids array spreading per message"
性能 trade-off: - ✅ 不创建新数组(节省 GC) - ❌ 需要手动维护不变性 - 约定:外部通过 setAppState 修改,不能直接 push。
6. 行 209:submitMessage() —— 核心方法¶
async *submitMessage(
prompt: string | ContentBlockParam[],
options?: { uuid?: string; isMeta?: boolean },
): AsyncGenerator<SDKMessage, void, unknown>
签名分析:
- prompt: string | ContentBlockParam[] —— 用户输入(纯文本 OR 富内容)
- options.uuid —— 消息 ID(用于追踪)
- options.isMeta —— 是否元消息(不显示给用户)
- 返回 AsyncGenerator<SDKMessage> —— 流式输出 SDK 消息
关键:submitMessage 是 async generator(不是 Promise)—— 流式返回 SDK 消息。
6.1 推测的内部流程¶
async *submitMessage(prompt, options) {
// 1. 检查 abort
if (this.abortController.signal.aborted) {
return
}
// 2. 处理 prompt(如果含 slash command,提取命令)
if (prompt.startsWith('/')) {
const cmdResult = await this.handleSlashCommand(prompt)
if (cmdResult.handled) {
yield* cmdResult.messages
return
}
}
// 3. 追加用户消息
this.mutableMessages.push({
role: 'user',
content: prompt,
uuid: options?.uuid ?? randomUUID(),
})
// 4. 持久化
if (!isSessionPersistenceDisabled()) {
await this.persistSession()
}
// 5. 跑 agent 循环
for await (const event of query(
this.mutableMessages,
this.buildSystemPrompt(),
this.buildToolUseContext(),
this.config.canUseTool,
{ /* ... */ }
)) {
// 6. 转换 SDK 消息 + 累计 usage
this.processQueryEvent(event)
yield event
}
}
关键设计点:
- slash command 在 submitMessage 内部处理(不是单独的"命令模式")
- 持久化是异步的,但不阻塞流式输出
- agent 循环用底层 query()(query.ts 的函数)
7. class 内部状态机(推测)¶
stateDiagram-v2
[*] --> Created: new QueryEngine(config)
Created: 5 个字段初始化
Created --> Idle: ready
Idle: 等待用户输入
Idle --> Submitting: submitMessage()
Submitting: 处理 prompt
Submitting --> SlashCommand: prompt 是 /xxx
Submitting --> UserMessage: prompt 是普通输入
SlashCommand: 调命令处理器
SlashCommand --> Persisting: 命令产生消息
SlashCommand --> Idle: 命令无副作用
UserMessage: 追加 user message
UserMessage --> Persisting
Persisting: 写 sessionStorage
Persisting --> Querying
Querying: 跑 query() 流
Querying --> Yielding: 流式返回
Yielding --> Querying: 继续
Querying --> Compact: 触发 compact
Compact --> Querying: 完成
Querying --> Idle: query() 结束
Idle --> Aborted: abortController.abort()
Aborted --> [*]
Idle --> [*]: 关闭
关键状态: - Created → Idle → Submitting → Persisting → Querying → Idle(循环) - 任何状态 都可以 → Aborted(用户取消)
8. ask() 顶层函数(行 1186-1295,110 行)¶
8.1 签名¶
8.2 推测的实现¶
export async function* ask(config: AskConfig): AsyncGenerator<SDKMessage> {
// 1. 构造 QueryEngine
const engine = new QueryEngine({
cwd: config.cwd,
tools: config.tools,
// ... 30+ 字段映射
})
// 2. 提交第一轮
yield* engine.submitMessage(config.prompt, {
uuid: config.uuid,
})
}
为什么需要 ask() 函数 + QueryEngine class:
QueryEngineclass —— 有状态,可复用(多轮对话)ask()函数 —— 无状态,一次性(首轮对话 + 兼容旧 API)
前端类比:
- class QueryEngine ≈ useState(带状态)
- function ask() ≈ fetch(无状态调用)
SDK 消费者更倾向 ask()(简单一次性调用),
REPL 消费者更倾向 QueryEngine class(多轮对话)。
9. QueryEngine 内部机制详解¶
9.1 拒绝追踪(permissionDenials)¶
作用:避免重复询问用户已经拒绝过的工具调用。
// 推测
if (this.hasDeniedBefore(toolName, input)) {
// 直接拒绝,不再询问
return { behavior: 'deny', message: 'Already denied' }
}
9.2 token 累计(totalUsage)¶
每次 API 响应 都累加 input_tokens、output_tokens、cost。
配套:
- getTotalAPIDuration() —— 总 API 调用时长
- getTotalCost() —— 总费用
- cost-tracker.ts 的 getModelUsage() —— 按模型分
9.3 文件状态缓存(readFileState)¶
避免重复读同一个文件。
// 推测
const content = this.readFileState.get(path) ?? await fs.readFile(path)
this.readFileState.set(path, content)
配套:utils/fileStateCache.ts 有 createFileStateCacheWithSizeLimit 等。
9.4 历史裁剪(snipReplay)¶
snipReplay?: (
yieldedSystemMsg: Message,
store: Message[],
) => { messages: Message[]; executed: boolean } | undefined
SDK 模式专用 —— 长会话时裁剪历史以节省 token。
触发:每次 yield 系统消息时调 snipReplay。
- 返回 undefined —— 不裁剪
- 返回 { messages, executed } —— 用 messages 替换 store 中的某些消息
为什么仅 SDK 启用:REPL 需要全历史供 UI 滚动。
9.5 孤儿权限处理(orphanedPermission)¶
当 submitMessage 启动时有未处理的权限请求("孤儿"),自动恢复而不是新提问。
if (this.config.orphanedPermission && !this.hasHandledOrphanedPermission) {
this.hasHandledOrphanedPermission = true
yield* this.recoverOrphanedPermission(this.config.orphanedPermission)
}
10. class 方法完整列表(推测)¶
| 方法 | 角色 |
|---|---|
constructor(config) |
初始化 |
submitMessage(prompt, options) |
启动一轮对话(主入口) |
persistSession() |
写 sessionStorage |
recoverOrphanedPermission(p) |
恢复孤儿权限 |
buildSystemPrompt() |
构造 system prompt |
buildToolUseContext() |
构造 ToolUseContext |
processQueryEvent(event) |
处理 query() 事件 + 累计 usage |
handleSlashCommand(prompt) |
处理 /xxx 命令 |
hasDeniedBefore(tool, input) |
检查拒绝历史 |
accumulateUsage(response) |
累加 usage |
总方法数推测:15-20 个。
11. class 生命周期详解¶
[创建]
new QueryEngine(config)
→ 5 个字段初始化
[第 1 轮]
for await (msg of engine.submitMessage(prompt1)) {
// 处理 msg
}
// query() 返回,submitMessage 完成
// mutableMessages 包含 user + assistant 消息
[第 2 轮]
for await (msg of engine.submitMessage(prompt2)) {
// 处理 msg
}
// mutableMessages 现在有 user1 + assistant1 + user2 + assistant2
[关闭]
engine.abortController.abort() // 可选
// 没有显式 close —— GC 自动清理
关键: - 每次 submitMessage 是独立的"流" - mutableMessages 跨调用持续(class 实例的字段) - 没有 explicit close —— 显式资源释放
12. ask() vs QueryEngine¶
| 维度 | ask() 函数 |
QueryEngine class |
|---|---|---|
| 状态 | 无状态(每次新构造) | 有状态(跨 submitMessage 持续) |
| 用途 | 一次性对话 | 多轮对话 |
| 复杂度 | 简单调用 | 复杂生命周期 |
| SDK 友好度 | ⭐⭐⭐⭐ | ⭐⭐ |
| REPL 友好度 | ⭐⭐ | ⭐⭐⭐⭐⭐ |
| 测试友好度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
结论:
- SDK 用 ask()("调一次完事")
- REPL 用 QueryEngine("长连接持续")
注释里说"未来阶段 REPL 可能用 QueryEngine" —— 现在 REPL 还没切到 class(可能因为迁移成本),但设计意图清晰。
13. 关键洞察¶
13.1 class vs function 的"哲学选择"¶
Claude Code 在大多数地方默认函数,少数用 class。
class 的判定标准:"跨调用状态"或"长生命周期"。
QueryEngine 是"长生命周期" → class。
13.2 State 注入 vs State 导入¶
QueryEngine 通过 getAppState / setAppState 注入 state,不直接 import store。
好处: - 可测试(mock state 注入) - 可多实例(REPL 一个,SDK 一个) - 不耦合(state 实现可换)
前端类比:和 React Context Provider 的"提升状态到公共祖先"是同种思路。
13.3 异步生成器 + class 的"双剑合璧"¶
- class 管理"长生命周期状态"
- async generator 提供"流式输出"
两者结合 = "持续状态 + 流式响应" —— Claude Code 的核心抽象。
13.4 配置驱动的工程哲学¶
54 行的 QueryEngineConfig 看似夸张,但每个字段都是"必填 vs 选填"显式。
好处:
- TypeScript 防止缺字段
- 默认值明确(??)
- 新功能加 config 字段即可,不动 class 内部
13.5 snipReplay 的"SDK 专属"设计¶
注释明确说:"SDK-only: the REPL keeps full history for UI scrollback..."
设计哲学:不同使用场景有不同 trade-off。 - SDK:长期 headless → 裁剪历史省 token - REPL:有 UI 滚动 → 保留全历史
同一个 class 通过 config 区分。
14. 实战:用 QueryEngine 写一个 SDK 客户端¶
import { QueryEngine, ask } from './QueryEngine.js'
// 方式 1:一次性对话(用 ask())
async function oneShot() {
for await (const msg of ask({
cwd: process.cwd(),
tools: [...],
commands: [...],
// ... config
prompt: 'Hello',
})) {
console.log(msg)
}
}
// 方式 2:多轮对话(用 QueryEngine class)
async function multiTurn() {
const engine = new QueryEngine({
cwd: process.cwd(),
tools: [...],
commands: [...],
// ... config
})
for await (const msg of engine.submitMessage('Hello')) {
console.log(msg)
}
for await (const msg of engine.submitMessage('How are you?')) {
console.log(msg)
}
}
15. 阅读清单¶
- ✅ 完整通读
src/QueryEngine.ts(1295 行) - ✅ 读 phase-06-agent-loop.md 配合
- 📌 读
src/query.ts(query() 函数) - 📌 读
src/state/AppStateStore.ts(深度专题) - 📌 读
src/entrypoints/agentSdkTypes.ts(SDK 类型)
16. 练习任务¶
- 数 class 的方法数 —— 完整列出(推测 15-20 个)
- 画 class 字段的关系图 ——
mutableMessages、readFileState、totalUsage怎么关联 - 设计你自己的 SDK 客户端 —— 用
ask()函数还是QueryEngineclass?为什么? - 思考:class 状态机的"显式状态转换"和"函数式状态机"(如 React useReducer)哪个好?Claude Code 为什么选 class?