阶段 6 | Agent 循环 + 流式 API¶
目标:理解 Claude Code 的"思考 → 行动 → 观察"循环怎么从前端代码角度实现 —— 流式响应解析、工具调用、错误重试、上下文压缩、多 agent 协调。 时长:2~3 天 前端类比:聊天 IM 客户端的核心循环 —— WebSocket 收消息 → 解析 → 渲染 → 发消息。
6.1 三大件¶
src/query.ts 1729 行 query() + queryLoop() ← 核心循环
src/QueryEngine.ts 1295 行 class QueryEngine + ask() ← 状态机封装
src/services/api/claude.ts 3419 行 API 客户端(SDK 包装 + 流处理) ← 传输层
⚠️ 这三个文件任何一个都 > 1000 行,别想顺序读完。从接口和类型入手。
6.2 核心循环:query.ts 的 async function* query()¶
// src/query.ts:219
export async function* query(
messages: Message[],
systemPrompt: SystemPrompt,
context: ToolUseContext,
canUseTool: CanUseToolFn,
options: QueryOptions
): AsyncGenerator<StreamEvent, void, void>
// src/query.ts:241
async function* queryLoop(
// ... 同 query 但内部循环
): AsyncGenerator<StreamEvent, void, void>
6.2.1 关键设计:异步生成器¶
async function* + yield 是 Claude Code 流式架构的基石:
async function* query(...) {
while (true) {
// 1. 构造请求
const stream = await makeStreamRequest(...)
// 2. 逐 token 解析
for await (const event of stream) {
yield event // 转发每个 stream event
}
// 3. 检查是否需要继续
if (shouldTerminate(state)) break
// 4. 准备下一轮
}
}
为什么用 async function* 而不是 Promise?
| 模式 | 能力 |
|---|---|
Promise<T> |
一次返回,无法中途消费 |
EventEmitter |
多消费者、但缺 backpressure |
Observable(RxJS) |
强大但重、学习曲线 |
async function*(TC39) |
标准、backpressure 友好、可被 for await 消费、可被 for await break 取消 |
前端类比:和 React Server Components 的 streaming、fetch 的 ReadableStream、WebSocket 的 message event 是同种思想。Claude Code 选 async function* 是最符合 JavaScript 标准的方式。
6.2.2 queryLoop 的核心职责¶
观察 1729 行的 query.ts,循环内部必做的事:
- 构造 messages 数组 —— 加入 system prompt、tool results、上轮 assistant 响应
- 发起 API 请求 ——
services/api/claude.ts的流式调用 - 解析 stream 事件:
message_start→ 标记新消息开始content_block_start→ 文本/tool_use block 开始content_block_delta→ 流式更新(每个 token)content_block_stop→ block 结束message_delta→ finish_reason、usage 统计message_stop→ 整条消息结束- 检查 tool_use:如果 assistant 返回了 tool_use,准备执行
- 执行工具(带权限检查):
canUseTool(tool, input)→ 用户授权 → 执行 - 工具结果回填:构造 tool_result message,拼到 messages 末尾
- 检查 finish_reason:
end_turn→ 用户完成,breaktool_use→ 工具调用后继续循环max_tokens/stop_sequence→ 异常处理- 错误重试:
withRetry.ts处理可重试错误(429、5xx) - 上下文压缩(如果 token 接近上限):
compact/autoCompact.ts - 回到步骤 1(如果还需要继续)
6.3 状态机封装:QueryEngine class¶
// src/QueryEngine.ts:184
export class QueryEngine {
// 内部状态
// 事件源
// 重试策略
// 配额追踪
// 公开方法
async *ask(messages, options): AsyncGenerator<...>
}
6.3.1 为什么用 class 不用函数?¶
QueryEngine 内部维护跨调用的状态:
- 重试计数器
- 上次 API 响应时间
- 配额消耗追踪
- 限流 backoff 状态
- 链式调用追踪(QueryChainTracking)
前端类比:和"长连接客户端"的封装方式一样 —— IM 客户端、GraphQL Client、WebSocket Client 通常都用 class。
6.3.2 ask() 方法签名¶
ask() 是更上层的 API,封装"一次完整的多轮对话"。可能在内部多次调用 query()。
6.4 传输层:services/api/claude.ts¶
3419 行 —— 整个项目第二大文件(仅次于 REPL.tsx 的 5005 行) 角色:Anthropic API SDK 的完整封装
6.4.1 关键 import 透出的能力¶
观察 claude.ts 头部 import:
| 导入 | 作用 |
|---|---|
@anthropic-ai/sdk/resources/beta/messages/messages.mjs |
Beta API(流式、thinking、tools) |
utils/model/providers.js |
多种 API provider:Anthropic 直连 / AWS Bedrock / GCP Vertex |
constants/system.js |
System prompt 前缀、attribution header |
utils/auth.js |
凭据管理:API key / OAuth / AWS creds / GCP creds |
Tool.js |
工具接口 |
tasks/... |
任务协调 |
关键洞察 1:Claude Code 不只接 Anthropic 官方 API —— 还支持 AWS Bedrock、GCP Vertex、first-party OAuth (claude.ai 订阅)。这是企业级部署的标配。
关键洞察 2:3419 行的封装说明 Claude Code 用了大量 beta API 能力(extended thinking、prompt caching、tool use、message batches...)。这是"吃透 SDK"的活儿。
6.4.2 SDK 客户端构造:services/api/client.ts¶
// src/services/api/client.ts
import Anthropic, { type ClientOptions } from '@anthropic-ai/sdk'
import { getProxyFetchOptions } from 'src/utils/proxy.js'
// 4 种认证方式
const apiKey = await getAnthropicApiKey()
const oauthTokens = await getClaudeAIOAuthTokens()
const awsCreds = await refreshAndGetAwsCredentials()
const gcpCreds = await refreshGcpCredentialsIfNeeded()
// 根据 provider 选 ClientOptions
let options: ClientOptions
if (isFirstPartyAnthropicBaseUrl()) {
options = { apiKey, authToken, ...getProxyFetchOptions() }
} else if (isAwsBedrock()) {
options = { awsCreds, region, ... }
} else if (isGcpVertex()) {
options = { gcpCreds, ... }
}
前端类比:和"API client 工厂"模式一样 —— 根据环境变量选不同的后端。
6.5 重试与限流¶
核心文件: -
src/services/api/withRetry.ts-src/services/api/errors.ts-src/services/api/errorUtils.ts-src/services/api/claudeAiLimits.ts+claudeAiLimitsHook.ts-src/services/policyLimits/index.ts(企业策略限制)
6.5.1 重试策略¶
// withRetry.ts 推测导出
export async function* withRetry<T>(
fn: () => AsyncGenerator<T>,
classifyError: (err) => RetryableError | null
): AsyncGenerator<T>
categorizeRetryableAPIError() 区分:
- 可重试:429(限流)、5xx(服务端错误)、网络断开
- 不可重试:400(请求无效)、401(认证失败)、402(欠费)
6.5.2 限流消息¶
rateLimitMessages.ts 提供了人类可读的限流提示:
claudeAiLimits.ts 追踪claude.ai 订阅用户的剩余配额。
6.5.3 企业策略¶
policyLimits.ts 处理企业 IT 设置的限额 —— 老板可以限制员工每天用 Claude Code 的次数。
6.6 上下文压缩系统¶
核心目录:
src/services/compact/10 个文件 —— 大模型对话的"GC"
6.6.1 压缩类型¶
| 文件 | 触发条件 | 策略 |
|---|---|---|
autoCompact.ts |
token 接近上限(90%?) | 整段对话摘要 |
microCompact.ts |
局部 token 累积 | 只压缩工具结果 |
apiMicrocompact.ts |
API 级别的 microCompact | 由 SDK 支持 |
reactiveCompact.ts |
(Ant-only) 反应式压缩 | DCE 门控 |
contextCollapse.ts |
(Ant-only) 上下文坍缩 | DCE 门控 |
compact.ts |
手动 /compact 命令 |
用户主动 |
compactWarningHook.ts |
即将压缩时警告 | hook 钩子 |
compactWarningState.ts |
警告状态 | UI 状态 |
sessionMemoryCompact.ts |
压缩到 session memory | 跨会话记忆 |
grouping.ts |
消息分组(决定压缩哪几段) | 算法 |
timeBasedMCConfig.ts |
基于时间的 microCompact 配置 | 配置 |
postCompactCleanup.ts |
压缩后清理 | 资源释放 |
关键洞察:压缩是分层策略: - microCompact:每次工具结果太大就压缩(高频、轻量) - autoCompact:累计到阈值就整段摘要(中频、重量) - reactiveCompact:动态判断(中频、智能) - sessionMemoryCompact:跨会话记忆(异步、长期)
前端类比:和"前端性能优化"分层的思路一样 —— 微任务(debounce/throttle)、长任务(virtualization)、长期优化(lazy load)。
6.6.2 压缩的挑战¶
压缩的本质是"丢信息":
- 哪些信息可以丢?grouping.ts 决定
- 压缩比例多少?timeBasedMCConfig.ts 配置
- 什么时候提醒用户?compactWarningHook.ts 触发
- 怎么让 LLM 知道压缩发生了?buildPostCompactMessages()(query.ts 用)
6.7 多 Agent 协调:src/tasks/¶
核心:Claude Code 的"多 agent swarm"模式
6.7.1 任务类型¶
| 任务类型 | 文件 | 角色 |
|---|---|---|
LocalMainSessionTask |
LocalMainSessionTask.ts |
主会话任务 |
LocalAgentTask/ |
子目录 | 本地子 agent |
RemoteAgentTask/ |
子目录 | 远程 agent |
InProcessTeammateTask/ |
子目录 | 进程内队友 agent |
LocalShellTask/ |
子目录 | 本地 shell 长跑任务 |
DreamTask/ |
子目录 | 后台 dream 任务 |
stopTask.ts |
停止任务的工具 | 任务管理 |
6.7.2 Task 抽象¶
// src/Task.ts 头部
export type TaskType =
| 'local_bash'
| 'local_agent'
| 'remote_agent'
| 'in_process_teammate'
| 'local_workflow'
| 'monitor_mcp'
| 'dream'
export type TaskStatus =
| 'pending'
| 'running'
| 'completed'
| 'failed'
| 'killed'
export function isTerminalTaskStatus(status: TaskStatus): boolean
关键设计:
- 统一 Task 抽象 —— 不管什么任务都映射到 TaskType + TaskStatus
- isTerminalTaskStatus —— 状态机保护,避免向死任务注入消息
- TaskHandle —— cleanup 函数,和 React useEffect cleanup 同形
6.7.3 TaskState 形状¶
// src/tasks/types.ts
export type TaskState = LocalAgentTaskState | InProcessTeammateTaskState | ...
TypeScript 联合类型 + 类型守卫:
function isInProcessTeammateTask(task: TaskState): task is InProcessTeammateTaskState {
return task.type === 'in_process_teammate'
}
前端类比:和 Redux 的 normalized state + selectors 同种模式。
6.8 Coordinator 模式¶
核心:
src/coordinator/
coordinatorMode.ts 推测是多 agent 协调的状态机 —— 决定谁是 leader、谁负责什么、消息怎么路由。
多 agent 协调的关键问题:
1. 任务分发 —— SendMessageTool 把消息发给谁?
2. 状态同步 —— TeamCreateTool / TeamDeleteTool 怎么管理队伍?
3. 退出策略 —— useTeammateViewAutoExit 何时自动退出队友视图?
4. 权限冲突 —— utils/swarm/leaderPermissionBridge.ts 处理 leader 的工具权限代理
6.9 关键洞察¶
6.9.1 异步生成器是 Claude Code 的"反应式编程底座"¶
async function* 同时被 query、QueryEngine、tool.call、API client 使用 —— 这是项目的"惯用法"。理解了它,CLAUDE Code 60% 的代码就能读懂。
6.9.2 状态机分两层¶
- QueryEngine(class):单次对话 的状态机(重试、配额、链路追踪)
- TaskState(union + 守卫):多 agent / 后台任务 的状态机
两层不冲突,各管各的领域。
6.9.3 压缩系统的复杂度反推 LLM 限制¶
compact/ 10 个文件说明 token 上限是个真问题。
学习时关注:
- 怎么算"快满了"(isAutoCompactEnabled、calculateTokenWarningState)
- 怎么压缩(不同策略的 trade-off)
- 怎么告诉 LLM "上下文变小了"(buildPostCompactMessages)
6.9.4 流式 API 的"语义边界"¶
Claude Code 把流式响应解析成结构化事件(StreamEvent),然后在 query.ts 里逐事件 yield。
关键:yield 给消费方的是已解析的语义事件,不是原始 SSE 字符串。
前端类比:和 GraphQL 客户端的"cache normalized response"是同种"反序列化"思路。
6.10 阅读清单¶
- ✅
src/query.ts:1-100(imports + 类型)—— 看流式事件类型 - ✅
src/query.ts:219-280(query()函数签名 +queryLoop开头)—— 看循环骨架 - ✅
src/QueryEngine.ts:184-280(class 定义 + 状态)—— 看状态机 - ✅
src/services/api/claude.ts:1-50(imports)—— 看 API 维度(Beta / Bedrock / Vertex) - ✅
src/services/api/withRetry.ts—— 重试策略 - 🔍
src/services/compact/autoCompact.ts—— 自动压缩触发逻辑 - 🔍
src/Task.ts(全文)—— 任务抽象 - 🔍
src/tasks/LocalAgentTask/(选一个看)—— 具体任务实现 - 📌
src/coordinator/coordinatorMode.ts—— 协调器
6.11 练习任务¶
- 手写一个最小 agent 循环 —— 5 行的
async function*循环:调 API → 解析 tool_use → 模拟执行 → 拼结果 → 继续 - 列出 query.ts 里的所有 yield 点(
grep -n "yield" src/query.ts),理解每个 yield 触发什么 UI 渲染 - 设计压缩策略对比表 —— microCompact / autoCompact / reactiveCompact / contextCollapse 在"何时触发、压缩什么、压缩后怎么注入 LLM"三个维度的差异
- 思考:如果让你把
async function*改成 RxJS Observable,会牺牲什么?得到什么?Claude Code 选async function*的理由是什么?
6.12 下一步¶
进入 阶段 7:高级系统 —— MCP、plugins、skills、bridge、voice、vim、native 桥接的专题。