Walkthrough | 手写 5 行 agent 循环(阶段 6 练习答案)¶
对应练习:phase-06-agent-loop.md 6.11 练习任务 1 关联:topics/async-generator-pattern.md
目标¶
写一个真正流式的最小 agent 循环: - 接收 user message - 调 mock API(流式响应) - 如果返回 tool_use,调 mock 工具 - 把结果回传 - 重复直到 LLM 结束
用 async function* 实现,5 行核心 + 边界。
步骤 1:理解 async generator 流式循环¶
[LLM stream response] →
每次 yield 一个 event:
- content_block_delta (text)
- tool_use
- message_stop
直到 LLM 结束
步骤 2:完整实现¶
// 完整 mock 实现(带边界 ~50 行)
type Role = 'user' | 'assistant'
type Message = { role: Role; content: string; toolCall?: ToolCall }
type ToolCall = { name: string; input: unknown }
type StreamEvent =
| { type: 'text_delta'; text: string }
| { type: 'tool_use'; tool: ToolCall }
| { type: 'done' }
// 1. Mock API(流式响应)
async function* mockLLM(messages: Message[]): AsyncGenerator<StreamEvent> {
const last = messages[messages.length - 1]
if (last.content === 'list') {
yield { type: 'text_delta', text: 'I will list files. ' }
yield { type: 'tool_use', tool: { name: 'Bash', input: { cmd: 'ls' } } }
} else {
yield { type: 'text_delta', text: 'You said: ' }
yield { type: 'text_delta', text: last.content }
yield { type: 'done' }
}
}
// 2. Mock 工具
async function mockTool(name: string, input: unknown): Promise<string> {
if (name === 'Bash') return 'file1.txt\nfile2.txt'
return 'unknown tool'
}
// 3. ⭐ 5 行核心循环
async function* agent(messages: Message[]): AsyncGenerator<StreamEvent, void, void> {
while (true) { // 1. 循环
let toolUse: ToolCall | null = null // 2. 暂存 tool_use
for await (const event of mockLLM(messages)) { // 3. 透传每个 event
if (event.type === 'tool_use') toolUse = event.tool // 4. 抓 tool_use
yield event // 5. 立刻 yield 给消费者
}
if (!toolUse) return // 6. LLM 结束 → 退出
const result = await mockTool(toolUse.name, toolUse.input) // 7. 跑工具
messages = [...messages, // 8. 追加 tool_result
{ role: 'user', content: `Tool ${toolUse.name}: ${result}` }]
yield { type: 'text_delta', text: `\n[tool result: ${result}]\n` } // 9. 反馈
}
}
// 4. 测试
async function main() {
const messages: Message[] = [{ role: 'user', content: 'list' }]
for await (const event of agent(messages)) {
if (event.type === 'text_delta') process.stdout.write(event.text)
if (event.type === 'tool_use') console.log('\n[calling tool]', event.tool)
}
}
main()
输出:
I will list files.
[calling tool] { name: 'Bash', input: { cmd: 'ls' } }
[tool result: file1.txt
file2.txt]
步骤 3:精简到 5 行¶
把核心循环的 5 行突出:
async function* agent(messages) {
while (true) { // 行 1
let toolUse = null // 行 2
for await (const event of mockLLM(messages)) { // 行 3
if (event.type === 'tool_use') toolUse = event.tool // 行 4
yield event // 行 5
}
if (!toolUse) return // 退出
// ... 处理工具 + 继续
}
}
5 行核心就是整个 Claude Code agent 循环的"骨架"。
步骤 4:和 Claude Code 真实代码对比¶
// src/query.ts:241(简化)
async function* queryLoop(
messages: Message[],
systemPrompt: SystemPrompt,
context: ToolUseContext,
canUseTool: CanUseToolFn,
options: QueryOptions
): AsyncGenerator<StreamEvent, void, void> {
let currentMessages = messages
let attempt = 0
while (true) { // ← 我们的行 1
const stream = await callApi(currentMessages, systemPrompt)
let toolUseBlocks: ToolUse[] = [] // ← 我们的行 2 (变体)
for await (const event of stream) { // ← 我们的行 3
if (isToolUseEvent(event)) { // ← 我们的行 4
toolUseBlocks.push(parseToolUse(event))
}
yield event // ← 我们的行 5
}
if (toolUseBlocks.length === 0) {
return // LLM 结束
}
// 执行工具
for (const toolUse of toolUseBlocks) {
const result = await executeTool(toolUse, context, canUseTool)
currentMessages = [...currentMessages, result]
}
// 重试 / 限流
attempt++
}
}
5 行核心一模一样!其他都是工程化扩展: - 重试(withRetry) - 限流(policy limits) - 压缩(autoCompact) - 错误处理
步骤 5:测试各种边界¶
测试 1:基本流式¶
async function testBasic() {
const messages: Message[] = [{ role: 'user', content: 'hello' }]
const received: StreamEvent[] = []
for await (const event of agent(messages)) {
received.push(event)
}
// 期望:text_delta 'You said: ' + 'hello' + done
expect(received).toEqual([
{ type: 'text_delta', text: 'You said: ' },
{ type: 'text_delta', text: 'hello' },
{ type: 'done' },
])
}
测试 2:工具调用¶
async function testToolUse() {
const messages: Message[] = [{ role: 'user', content: 'list' }]
const received: StreamEvent[] = []
for await (const event of agent(messages)) {
received.push(event)
}
// 期望:text + tool_use + tool_result text
expect(received).toContainEqual({ type: 'text_delta', text: 'I will list files. ' })
expect(received).toContainEqual({ type: 'tool_use', tool: { name: 'Bash', input: { cmd: 'ls' } } })
expect(received).toContainEqual({ type: 'text_delta', text: expect.stringContaining('file1.txt') })
}
测试 3:消费者 break 取消¶
async function testCancel() {
const messages: Message[] = [{ role: 'user', content: 'hello' }]
let count = 0
for await (const event of agent(messages)) {
count++
if (count === 1) break // 立刻取消
}
// 期望:只处理了第一个 event
expect(count).toBe(1)
}
测试 4:异步生成器的"边迭代边修改"行为¶
it('async generator 反映当前 Set 内容', () => {
// 在 listener 内 add 新 listener 的情况
// 我们的 agent 循环里没用到 listeners,但 store.ts 测试里需要
// (见 walkthrough/handwrite-store.md)
})
常见错误¶
错误 1:先 await 再 yield(丢流式)¶
// ❌ 错的
async function* bad(messages) {
const all = await mockLLM(messages) // ❌ 一次性消费整个流
for (const event of all) yield event // ❌ 不再是流
}
// ✅ 对的
async function* good(messages) {
for await (const event of mockLLM(messages)) { // ✅ 边 await 边 yield
yield event
}
}
错误 2:忘记更新 messages 导致死循环¶
// ❌ 错的
async function* bad(messages) {
while (true) {
for await (const event of mockLLM(messages)) yield event
// ❌ 忘更新 messages,LLM 永远返回同样内容
}
}
// ✅ 对的
async function* good(messages) {
while (true) {
for await (const event of mockLLM(messages)) {
if (event.type === 'tool_use') {
const result = await mockTool(event.tool)
messages = [...messages, { role: 'user', content: result }] // ✅ 更新
} else {
yield event
}
}
if (noToolUse) return
}
}
错误 3:把 tool_result 拼到错误位置¶
// ❌ 错的:拼到 messages 前面
messages = [{ role: 'user', content: result }, ...messages]
// ✅ 对的:拼到末尾(LLM 才能看到"上文")
messages = [...messages, { role: 'user', content: result }]
进阶:扩展到 Claude Code 真实复杂度¶
async function* queryLoop(messages, ctx, canUseTool, options) {
let currentMessages = messages
let attempt = 0
while (true) {
// 1. ⭐ token 警告检查
if (calculateTokenWarningState(currentMessages) === 'auto_compact') {
currentMessages = await runAutoCompact(currentMessages)
}
// 2. ⭐ 流式调 API
let toolUseBlocks = []
let responseText = ''
for await (const event of callApiWithRetry(currentMessages, options)) {
if (event.type === 'text_delta') responseText += event.text
if (event.type === 'tool_use') toolUseBlocks.push(event.tool)
yield event
}
// 3. ⭐ 无 tool → 结束
if (toolUseBlocks.length === 0) return
// 4. ⭐ 执行所有 tool(带权限检查)
for (const tool of toolUseBlocks) {
const decision = await canUseTool(tool, ctx)
if (decision.behavior === 'deny') {
currentMessages = [...currentMessages, denialMessage(tool)]
continue
}
if (decision.behavior === 'ask') {
yield { type: 'permission_request', tool }
// 等用户输入...
}
const result = await executeTool(tool, ctx)
currentMessages = [...currentMessages, result]
}
// 5. ⭐ 继续
attempt++
if (attempt > MAX_ATTEMPTS) {
throw new Error('Max attempts reached')
}
}
}
核心 5 行 + 工程化扩展 5 段。
关键洞察¶
1. async function* 是 Claude Code 整个流式架构的底座¶
5 行看懂全部 50 万行代码的"循环"逻辑。
2. for-await + yield 的"边生产边消费"是核心¶
不能"先全部 await 再 yield"。
不能"yield 整个结果"。
必须"yield 每个 event"。
3. 死循环的"终止条件"是关键¶
我们的版本:if (!toolUse) return
Claude Code 版本:if (finish_reason === 'end_turn') return
4. "更新 messages + 继续" 是循环推进器¶
没更新 = 死循环。
更新错了位置 = LLM 看错上文。
配套资源¶
- 真实源码:
src/query.ts:241(queryLoop) - 真实状态机:data/state-machines.md 第 1 节
- 真实时序:data/sequence-diagrams.md 第 1 节