Topic | async function* 异步生成器:Claude Code 的反应式底座¶
重要性:⭐⭐⭐⭐⭐(核心惯用法) 出现位置:src/query.ts、src/QueryEngine.ts、src/services/api/claude.ts、src/Tool.ts 的
tool.call()、src/services/mcp/MCPConnectionManager.tsx、几乎所有 I/O 密集模块 关联:phase-05-tools.md 的async *call、phase-06-agent-loop.md 的 query 循环、phase-07-advanced.md 的 MCP transport、glossary
1. 为什么 Claude Code 选 async function*¶
Claude Code 几乎所有"长流程异步操作"都用了 async function* 异步生成器。这不是偶然 —— 这是对 Promise / EventEmitter / Observable / Callback 四个候选方案的深思熟虑后的选择。
| 方案 | 优点 | 缺点 | Claude Code 用吗 |
|---|---|---|---|
Promise<T> |
简单、TypeScript 友好 | 只能一次返回 | 极少(仅一次性 IO) |
EventEmitter |
多消费者、解耦 | 无 backpressure、无取消 | 内部使用(src/ink/events/emitter.ts) |
| Callback | 最灵活 | 回调地狱 | N-API 集成(vendor/) |
Observable (RxJS) |
强大、算子丰富 | 重(~50KB+)、学习曲线陡 | ❌ |
async function* |
标准、backpressure 友好、可被 for await 消费、可被 for await break 取消 | 单消费者 | ✅ 全栈用 |
关键洞察:async function* + for await...of 的组合:
- for await (const e of gen) —— 消费所有事件
- for await (const e of gen) { break } —— 中途取消(生成器的 return 被调用)
- 隐式 backpressure —— 消费者慢,生产者就 yield 慢(事件循环调度)
- 标准、零依赖 —— 不需要 RxJS
2. 标准模式:async function* 模板¶
2.1 简单模板¶
async function* watchFile(path: string): AsyncGenerator<FileEvent> {
const watcher = fs.watch(path)
try {
for await (const event of watcher) {
yield { type: 'change', path, event }
}
} finally {
watcher.close()
}
}
三要素:
1. async function* —— 声明异步生成器
2. yield —— 把值推给消费者
3. finally 清理 —— 消费者 break 时会自动调用生成器的 return,触发 finally
2.2 跨 yield 的状态保持¶
async function* session(): AsyncGenerator<SessionEvent> {
const state = { messages: [] } // 在生成器内、yield 之间保持
yield { type: 'start', state }
for (const input of inputs) {
const response = await callApi(input, state)
state.messages.push(response) // 状态持续累积
yield { type: 'response', response }
}
yield { type: 'end', totalMessages: state.messages.length }
}
和 Promise 的根本差异:Promise 内的局部变量在 await 后可以保持(闭包),但Promise 本身只能 resolve 一次。生成器可以多次 yield + 跨 yield 保持状态。
2.3 错误处理¶
async function* robustStream(): AsyncGenerator<Data> {
try {
while (true) {
try {
const data = await fetchNext()
yield data
} catch (err) {
if (isRetryable(err)) {
yield { type: 'retry', err } // 把错误当事件
continue
}
throw err // 真正错误向上冒泡
}
}
} finally {
await cleanup()
}
}
两种错误策略:
- 错误当事件(yield { type: 'error', err })—— 消费者决定怎么处理
- 错误向上冒泡(throw)—— 终止生成器、消费者收到异常
Claude Code 两种都用(见第 5 节 query.ts 例子)。
3. Claude Code 实际用法¶
3.1 src/query.ts:219 —— query() 主循环¶
// 简化版(实际 1729 行)
export async function* query(
messages: Message[],
systemPrompt: SystemPrompt,
context: ToolUseContext,
canUseTool: CanUseToolFn,
options: QueryOptions
): AsyncGenerator<StreamEvent, void, void> {
let currentMessages = messages
let attempt = 0
while (true) {
// 1. 构造请求
const stream = await callApi(currentMessages, systemPrompt)
// 2. 逐 token 解析 + 透传
let responseMessages: Message[] = []
for await (const event of stream) {
yield event // 透传给消费者
if (isMessageComplete(event)) {
responseMessages.push(extractMessage(event))
}
}
// 3. 检查是否需要继续
const toolUseBlocks = responseMessages.flatMap(m => m.toolUseBlocks)
if (toolUseBlocks.length === 0) {
return // 用户完成,结束
}
// 4. 执行工具
for (const toolUse of toolUseBlocks) {
const result = await executeTool(toolUse, context, canUseTool)
currentMessages = [...currentMessages, result]
yield { type: 'tool_result', result } // 工具结果也作为事件
}
// 5. 重试 / 限流 / 压缩(在 queryLoop 里)
attempt++
}
}
关键观察:
- 每条 stream event 都 yield 出去(最高保真度)
- 工具结果也作为事件 yield(消费者可以看到完整过程)
- 没有"结束"标志 —— return 自然结束(消费者 for-await 结束)
- 重试 / 限流包在 queryLoop 里(外层 queryLoop 也是 async generator)
3.2 src/QueryEngine.ts:184 —— class QueryEngine + ask()¶
// 简化版(实际 1295 行)
export class QueryEngine {
private retryCount = 0
private totalCost = 0
private queryChain: QueryChainTracking
async *ask(messages: Message[]): AsyncGenerator<AskEvent> {
// 1. 准备
const engineState = this.prepareState()
// 2. 调底层 query() —— 透传
for await (const event of this.query(messages, engineState)) {
// 3. 中间状态追踪
if (event.type === 'usage') {
this.totalCost += event.usage.cost
this.queryChain.recordUsage(event.usage)
}
// 4. 透传
yield event
// 5. 可选:消费者 break 时,query() 的 for-await 也会 break
// → query() 内部 finally 清理会自动跑
}
}
}
设计模式:
- Class 封装跨调用状态(retryCount、totalCost)
- ask() 方法本身就是 async generator —— 调用方可以 for await 消费
- 透传 + 拦截 —— 透传底层 event,同时在中间插入状态追踪
3.3 src/Tool.ts —— tool.call() 的 async generator¶
// 推测的 tool.call 模式(基于注释和实践)
class BashTool {
async *call(
input: BashInput,
ctx: ToolUseContext
): AsyncGenerator<ToolEvent, ToolResult> {
// 1. 校验
const validation = this.validate(input)
if (!validation.ok) {
yield { type: 'validation_error', errors: validation.errors }
return { content: validation.errorMessage, is_error: true }
}
// 2. 权限检查
const decision = await checkPermission(this.name, input, ctx)
if (decision === 'deny') {
return { content: 'Denied by user', is_error: true }
}
// 3. 启动
yield { type: 'start', command: input.command }
// 4. 跑命令(流式输出)
const subprocess = spawn(input.command)
try {
for await (const chunk of subprocess.stdout) {
yield { type: 'stdout', data: chunk } // 实时反馈
}
const exitCode = await subprocess.waitExit()
yield { type: 'exit', exitCode }
// 5. 返回最终结果
return { content: 'Command completed', is_error: exitCode !== 0 }
} finally {
subprocess.kill() // 消费者 break 时清理
}
}
}
核心模式:
- 多次 yield 进度 + 最终 return 结果
- finally 清理 subprocess —— 关键
- 消费者 break 触发 return —— 资源自动释放
3.4 src/services/api/claude.ts —— 包装 SDK¶
// 简化版
export async function* streamApi(
params: MessageStreamParams
): AsyncGenerator<StreamEvent> {
const client = getClient()
const stream = await client.messages.stream(params)
try {
for await (const event of stream) {
// 转换 SDK 事件为内部事件
yield transformEvent(event)
}
} catch (err) {
// SDK 错误统一处理
yield { type: 'api_error', error: err }
throw err
}
}
Adapter 模式:
- 输入:Anthropic SDK 的事件流
- 输出:Claude Code 内部统一的 StreamEvent 枚举
- 好处:上层代码不依赖具体 SDK,未来换 SDK 不影响业务代码
4. 消费者侧的用法¶
4.1 REPL.tsx 怎么消费¶
// 简化版(基于 REPL.tsx 5005 行的实际模式)
function REPL() {
const [messages, setMessages] = useState<Message[]>([])
const handleSubmit = async (input: string) => {
setMessages(prev => [...prev, createUserMessage(input)])
// 调 agent 循环
const engine = new QueryEngine(...)
for await (const event of engine.ask(messages)) {
switch (event.type) {
case 'content_delta':
// 流式更新最后一条 assistant message
setMessages(prev => updateLastAssistant(prev, event.delta))
break
case 'tool_use':
setMessages(prev => [...prev, event.toolUseMessage])
break
case 'tool_result':
setMessages(prev => [...prev, event.resultMessage])
break
// ...
}
}
}
return <Messages messages={messages} />
}
关键模式:
- for await 在事件循环回调里 —— React 18 自动 batching
- 每个 event 触发 setState —— 流式更新
- 循环结束 = 对话完成
4.2 中途取消(AbortController 配合)¶
// 简化版
async function cancelableQuery() {
const controller = new AbortController()
const gen = query(messages, systemPrompt, context, canUseTool, {
signal: controller.signal,
})
// 用户按 Ctrl+C → controller.abort()
setTimeout(() => controller.abort(), 30000)
try {
for await (const event of gen) {
if (event.type === 'content_delta') {
// 检查 signal
if (controller.signal.aborted) break
yield event
}
}
} finally {
// 清理
}
}
两个层次的取消:
1. controller.abort() —— 信号传到底层,HTTP 请求中断、subprocess kill
2. for await...break —— 只停止消费,不立即终止生成器(等下一个 yield 检查)
Claude Code 两者都用:AbortController 用于硬中断,break 用于软取消。
5. 性能模式¶
5.1 Module-level 缓存¶
async function* 本身不能缓存(每次调用都新建),但包装它的函数可以:
// 简化版 Markdown.tsx 的 token cache
const tokenCache = new Map<string, Token[]>()
async function* parseMarkdown(text: string): AsyncGenerator<Token> {
if (tokenCache.has(text)) {
for (const token of tokenCache.get(text)!) yield token
return
}
// ... 实际解析 + 缓存
}
为什么用 module-level Map 而不是 useMemo:
- useMemo 在 React unmount 时丢失
- async function* 的生命周期独立于 React
- 跨组件、跨 session 的缓存只能放 module-level
5.2 Backpressure 利用¶
// 慢消费者
for await (const event of fastProducer()) {
await renderSlowly(event) // 渲染比生产慢
// ↑ producer 会在 yield 后阻塞,等下一次 next() 调用
// ↑ 这就是天然的 backpressure
}
前端类比:和 Web Streams API 的 ReadableStream 是同种"消费者慢 → 生产者暂停"机制。
5.3 Lazy iteration¶
// ❌ 错的:先全部生产再消费
const events = []
for await (const e of gen) events.push(e) // 内存爆炸
for (const e of events) handle(e)
// ✅ 对的:流式生产-消费
for await (const e of gen) {
handle(e) // 一次只处理一个
}
6. 反模式(Claude Code 怎么避免)¶
6.1 不要"先 await 再 yield"¶
// ❌ 错的:先全部 await,再 yield(丢失流式)
async function* bad() {
const all = await fetchAll() // 一次性
for (const item of all) yield item // 已经不是流了
}
// ✅ 对的:边 await 边 yield
async function* good() {
const stream = getStream()
for await (const item of stream) {
yield item // 真正流式
}
}
6.2 不要在生成器外共享可变状态¶
// ❌ 错的:生成器外状态被并发消费者影响
let counter = 0
async function* bad() {
counter++ // 多消费者并发调用会互相影响
yield counter
}
// ✅ 对的:状态封装在生成器内
async function* good(id: string) {
const counter = { local: 0 }
counter.local++
yield counter.local
}
6.3 不要忘记 finally¶
// ❌ 错的:消费者 break 时资源泄露
async function* bad(path: string) {
const handle = await openFile(path)
for await (const chunk of handle) yield chunk
// 没人 close!
}
// ✅ 对的:finally 兜底
async function* good(path: string) {
const handle = await openFile(path)
try {
for await (const chunk of handle) yield chunk
} finally {
await handle.close()
}
}
7. 关键洞察¶
7.1 整个项目的"心脏"¶
async function* 同时被 query、QueryEngine、tool.call、API client、MCP transport 使用 —— 这是项目的"惯用法"。理解了它,Claude Code 60% 的核心代码就能读懂。
7.2 "async function* + 透传" 是组合性的¶
每一层都是 async generator,消费者可以插入任意中间层(添加 retry、metrics、过滤)。这是函数式组合和面向对象继承的根本差异 —— 选前者更灵活。
7.3 "yield 当事件 + return 当结果" 的统一性¶
Claude Code 的所有"长流程"都遵循这个模式: - 多次 yield = 进度/中间状态 - 最后 return = 最终结果 - throw = 错误终止
前端类比:和 React Suspense 的"throw promise"是同种"lazy evaluation"思路的兄弟。
7.4 "backpressure 来自事件循环"¶
不需要复杂的流控库 —— JavaScript 单线程 + 事件循环 天然 backpressure。这是 async function* 比 callback 更优雅的关键。
8. 阅读清单¶
- ✅
src/query.ts:219-280—— query() 主循环 - ✅
src/QueryEngine.ts:184-280—— class 状态机 - ✅
src/services/api/claude.ts:1-80—— SDK 包装 - ✅
src/Tool.ts:783-792—— buildTool 工厂(推测 call() 签名) - 📌
src/services/mcp/MCPConnectionManager.tsx—— 看 MCP 怎么用 async generator* - 📌
src/utils/attachments.ts:3997 行—— 附件处理的最大生成器用法
9. 练习任务¶
- 手写一个 async generator 包装一个 5 秒的 setTimeout 序列,验证 for-await break 能取消
- 实现一个 LRU token cache 包在 async generator 外面
- 测试 backpressure:写两个 for-await 循环,一个快一个慢,观察慢消费者的内存增长
- 思考:如果 Claude Code 改用 RxJS,哪些代码会变短?哪些会变长?项目作者为什么选 async function*?