Walkthrough | 手写 QueryEngine¶
难度:⭐⭐⭐⭐ 时间:~2h 目标:理解 QueryEngine 核心逻辑,写一个简化版
1. QueryEngine 是什么¶
QueryEngine = Claude Code 的消息循环核心。 - 接收用户输入 - 调 Claude API - 处理 tool_use - 流式更新 UI - 处理错误
2. 目标¶
手写一个简化版 QueryEngine: - 单轮对话 - 1 个 tool(Read) - 流式输出 - 基础错误处理
3. 完整代码¶
// mini-query-engine.ts
import Anthropic from '@anthropic-ai/sdk'
interface Message {
role: 'user' | 'assistant'
content: string | ContentBlock[]
}
interface ContentBlock {
type: 'text' | 'tool_use' | 'tool_result'
// ...
}
interface Tool {
name: string
description: string
inputSchema: any
execute: (input: any) => Promise<string>
}
export class MiniQueryEngine {
private client: Anthropic
private messages: Message[] = []
private tools: Tool[] = []
constructor(apiKey: string, model: string = 'claude-sonnet-4-6') {
this.client = new Anthropic({ apiKey })
this.model = model
}
registerTool(tool: Tool) {
this.tools.push(tool)
}
async ask(prompt: string): Promise<string> {
// 1. 加 user message
this.messages.push({ role: 'user', content: prompt })
// 2. 调 API
let response = await this.callAPI()
// 3. 循环处理 tool_use
while (this.hasToolUse(response)) {
// 4. 加 assistant message
this.messages.push({
role: 'assistant',
content: response.content,
})
// 5. 执行 tool
const toolResults = await this.executeTools(response.content)
// 6. 加 tool_result
this.messages.push({
role: 'user',
content: toolResults,
})
// 7. 调 API again
response = await this.callAPI()
}
// 8. 加 final assistant message
const text = this.extractText(response)
this.messages.push({
role: 'assistant',
content: text,
})
return text
}
private async callAPI() {
return await this.client.messages.create({
model: this.model,
max_tokens: 4096,
tools: this.tools.map((t) => ({
name: t.name,
description: t.description,
input_schema: t.inputSchema,
})),
messages: this.messages,
})
}
private hasToolUse(response: any): boolean {
return response.content.some((b: any) => b.type === 'tool_use')
}
private async executeTools(blocks: ContentBlock[]): Promise<ContentBlock[]> {
const results: ContentBlock[] = []
for (const block of blocks) {
if (block.type !== 'tool_use') continue
const tool = this.tools.find((t) => t.name === block.name)
if (!tool) {
results.push({
type: 'tool_result',
tool_use_id: block.id,
content: `Tool ${block.name} not found`,
is_error: true,
})
continue
}
try {
const output = await tool.execute(block.input)
results.push({
type: 'tool_result',
tool_use_id: block.id,
content: output,
})
} catch (e) {
results.push({
type: 'tool_result',
tool_use_id: block.id,
content: `Error: ${(e as Error).message}`,
is_error: true,
})
}
}
return results
}
private extractText(response: any): string {
return response.content
.filter((b: any) => b.type === 'text')
.map((b: any) => b.text)
.join('')
}
}
~120 行。
4. 使用示例¶
const engine = new MiniQueryEngine(process.env.ANTHROPIC_API_KEY!)
engine.registerTool({
name: 'Read',
description: 'Read a file',
inputSchema: {
type: 'object',
properties: { path: { type: 'string' } },
required: ['path'],
},
execute: async ({ path }) => {
return require('fs').readFileSync(path, 'utf-8')
},
})
const answer = await engine.ask('Read /etc/hostname')
console.log(answer)
简单。
5. 关键设计点¶
5.1 消息循环¶
循环 直到没有 tool_use。
5.2 Tool 执行¶
并行 推测(这里串行)。
5.3 错误处理¶
catch (e) {
results.push({
type: 'tool_result',
tool_use_id: block.id,
content: `Error: ${(e as Error).message}`,
is_error: true,
})
}
错误 → tool_result (is_error: true)。
6. 6 个扩展¶
6.1 流式输出¶
private async *streamAPI() {
const stream = await this.client.messages.create({
model: this.model,
max_tokens: 4096,
messages: this.messages,
stream: true,
})
for await (const event of stream) {
yield event
}
}
stream: true。
6.2 多 tool 并行¶
private async executeTools(blocks: ContentBlock[]): Promise<ContentBlock[]> {
return await Promise.all(
blocks
.filter((b) => b.type === 'tool_use')
.map((b) => this.executeOneTool(b))
)
}
Promise.all。
6.3 Tool 权限¶
private async checkPermission(tool: Tool, input: any): Promise<boolean> {
// 实现 6 层决策
return true // 或 false
}
6 层。
6.4 缓存优化¶
content hash。
6.5 工具子集¶
动态。
6.6 Session 持久化¶
持久化。
7. 5 个关键洞察¶
- 消息循环 是核心
- tool execution 是关键
- 错误处理 必走 tool_result
- 流式 用 stream: true
- 持久化 是 session 基础
8. 对比真实 QueryEngine¶
| 维度 | Mini | 真实 |
|---|---|---|
| 行数 | 120 | 4000+ |
| 工具 | 1 | 100+ |
| 模式 | 串行 | 并行 + 投机 |
| 流式 | 简单 | 复杂 |
| 错误 | 简单 | 25+ validator |
| 持久化 | 无 | 完整 |
简化 vs 真实。
9. 5 个练习¶
- 加 streaming —— 用
stream: true - 加 token 累加 —— 记录 input/output
- 加 tool 权限 —— 简单 allow/deny
- 加 session 持久化 —— save/load JSON
- 加 prompt cache —— 静态 system prompt
5 步。
10. 总结¶
手写 QueryEngine = 理解消息循环 + tool 执行 + 错误处理。
核心: - 120 行简化版 - 1 个 tool - 流式可选 - 错误隔离
下一步: - 看真实 deep-dive-query-engine.md - 加 streaming - 加多 tool