跳转至

Topic | async function* 异步生成器:Claude Code 的反应式底座

重要性:⭐⭐⭐⭐⭐(核心惯用法) 出现位置:src/query.ts、src/QueryEngine.ts、src/services/api/claude.ts、src/Tool.ts 的 tool.call()、src/services/mcp/MCPConnectionManager.tsx、几乎所有 I/O 密集模块 关联phase-05-tools.md 的 async *callphase-06-agent-loop.md 的 query 循环phase-07-advanced.md 的 MCP transportglossary


1. 为什么 Claude Code 选 async function*

Claude Code 几乎所有"长流程异步操作"都用了 async function* 异步生成器。这不是偶然 —— 这是对 Promise / EventEmitter / Observable / Callback 四个候选方案的深思熟虑后的选择。

方案 优点 缺点 Claude Code 用吗
Promise<T> 简单、TypeScript 友好 只能一次返回 极少(仅一次性 IO)
EventEmitter 多消费者、解耦 无 backpressure、无取消 内部使用(src/ink/events/emitter.ts
Callback 最灵活 回调地狱 N-API 集成(vendor/)
Observable (RxJS) 强大、算子丰富 (~50KB+)、学习曲线陡
async function* 标准、backpressure 友好、可被 for await 消费、可被 for await break 取消 单消费者 全栈用

关键洞察async function* + for await...of 的组合: - for await (const e of gen) —— 消费所有事件 - for await (const e of gen) { break } —— 中途取消(生成器的 return 被调用) - 隐式 backpressure —— 消费者慢,生产者就 yield 慢(事件循环调度) - 标准、零依赖 —— 不需要 RxJS

2. 标准模式:async function* 模板

2.1 简单模板

async function* watchFile(path: string): AsyncGenerator<FileEvent> {
  const watcher = fs.watch(path)
  try {
    for await (const event of watcher) {
      yield { type: 'change', path, event }
    }
  } finally {
    watcher.close()
  }
}

三要素: 1. async function* —— 声明异步生成器 2. yield —— 把值推给消费者 3. finally 清理 —— 消费者 break 时会自动调用生成器的 return,触发 finally

2.2 跨 yield 的状态保持

async function* session(): AsyncGenerator<SessionEvent> {
  const state = { messages: [] }  // 在生成器内、yield 之间保持
  yield { type: 'start', state }

  for (const input of inputs) {
    const response = await callApi(input, state)
    state.messages.push(response)  // 状态持续累积
    yield { type: 'response', response }
  }

  yield { type: 'end', totalMessages: state.messages.length }
}

和 Promise 的根本差异:Promise 内的局部变量在 await可以保持(闭包),但Promise 本身只能 resolve 一次。生成器可以多次 yield + 跨 yield 保持状态

2.3 错误处理

async function* robustStream(): AsyncGenerator<Data> {
  try {
    while (true) {
      try {
        const data = await fetchNext()
        yield data
      } catch (err) {
        if (isRetryable(err)) {
          yield { type: 'retry', err }  // 把错误当事件
          continue
        }
        throw err  // 真正错误向上冒泡
      }
    }
  } finally {
    await cleanup()
  }
}

两种错误策略: - 错误当事件yield { type: 'error', err })—— 消费者决定怎么处理 - 错误向上冒泡throw)—— 终止生成器、消费者收到异常

Claude Code 两种都用(见第 5 节 query.ts 例子)。

3. Claude Code 实际用法

3.1 src/query.ts:219 —— query() 主循环

// 简化版(实际 1729 行)
export async function* query(
  messages: Message[],
  systemPrompt: SystemPrompt,
  context: ToolUseContext,
  canUseTool: CanUseToolFn,
  options: QueryOptions
): AsyncGenerator<StreamEvent, void, void> {
  let currentMessages = messages
  let attempt = 0

  while (true) {
    // 1. 构造请求
    const stream = await callApi(currentMessages, systemPrompt)

    // 2. 逐 token 解析 + 透传
    let responseMessages: Message[] = []
    for await (const event of stream) {
      yield event  // 透传给消费者
      if (isMessageComplete(event)) {
        responseMessages.push(extractMessage(event))
      }
    }

    // 3. 检查是否需要继续
    const toolUseBlocks = responseMessages.flatMap(m => m.toolUseBlocks)
    if (toolUseBlocks.length === 0) {
      return  // 用户完成,结束
    }

    // 4. 执行工具
    for (const toolUse of toolUseBlocks) {
      const result = await executeTool(toolUse, context, canUseTool)
      currentMessages = [...currentMessages, result]
      yield { type: 'tool_result', result }  // 工具结果也作为事件
    }

    // 5. 重试 / 限流 / 压缩(在 queryLoop 里)
    attempt++
  }
}

关键观察: - 每条 stream event 都 yield 出去(最高保真度) - 工具结果也作为事件 yield(消费者可以看到完整过程) - 没有"结束"标志 —— return 自然结束(消费者 for-await 结束) - 重试 / 限流包在 queryLoop 里(外层 queryLoop 也是 async generator)

3.2 src/QueryEngine.ts:184 —— class QueryEngine + ask()

// 简化版(实际 1295 行)
export class QueryEngine {
  private retryCount = 0
  private totalCost = 0
  private queryChain: QueryChainTracking

  async *ask(messages: Message[]): AsyncGenerator<AskEvent> {
    // 1. 准备
    const engineState = this.prepareState()

    // 2. 调底层 query() —— 透传
    for await (const event of this.query(messages, engineState)) {
      // 3. 中间状态追踪
      if (event.type === 'usage') {
        this.totalCost += event.usage.cost
        this.queryChain.recordUsage(event.usage)
      }

      // 4. 透传
      yield event

      // 5. 可选:消费者 break 时,query() 的 for-await 也会 break
      //    → query() 内部 finally 清理会自动跑
    }
  }
}

设计模式: - Class 封装跨调用状态(retryCount、totalCost) - ask() 方法本身就是 async generator —— 调用方可以 for await 消费 - 透传 + 拦截 —— 透传底层 event,同时在中间插入状态追踪

3.3 src/Tool.ts —— tool.call() 的 async generator

// 推测的 tool.call 模式(基于注释和实践)
class BashTool {
  async *call(
    input: BashInput,
    ctx: ToolUseContext
  ): AsyncGenerator<ToolEvent, ToolResult> {
    // 1. 校验
    const validation = this.validate(input)
    if (!validation.ok) {
      yield { type: 'validation_error', errors: validation.errors }
      return { content: validation.errorMessage, is_error: true }
    }

    // 2. 权限检查
    const decision = await checkPermission(this.name, input, ctx)
    if (decision === 'deny') {
      return { content: 'Denied by user', is_error: true }
    }

    // 3. 启动
    yield { type: 'start', command: input.command }

    // 4. 跑命令(流式输出)
    const subprocess = spawn(input.command)
    try {
      for await (const chunk of subprocess.stdout) {
        yield { type: 'stdout', data: chunk }  // 实时反馈
      }

      const exitCode = await subprocess.waitExit()
      yield { type: 'exit', exitCode }

      // 5. 返回最终结果
      return { content: 'Command completed', is_error: exitCode !== 0 }
    } finally {
      subprocess.kill()  // 消费者 break 时清理
    }
  }
}

核心模式: - 多次 yield 进度 + 最终 return 结果 - finally 清理 subprocess —— 关键 - 消费者 break 触发 return —— 资源自动释放

3.4 src/services/api/claude.ts —— 包装 SDK

// 简化版
export async function* streamApi(
  params: MessageStreamParams
): AsyncGenerator<StreamEvent> {
  const client = getClient()
  const stream = await client.messages.stream(params)

  try {
    for await (const event of stream) {
      // 转换 SDK 事件为内部事件
      yield transformEvent(event)
    }
  } catch (err) {
    // SDK 错误统一处理
    yield { type: 'api_error', error: err }
    throw err
  }
}

Adapter 模式: - 输入:Anthropic SDK 的事件流 - 输出:Claude Code 内部统一的 StreamEvent 枚举 - 好处:上层代码不依赖具体 SDK,未来换 SDK 不影响业务代码

4. 消费者侧的用法

4.1 REPL.tsx 怎么消费

// 简化版(基于 REPL.tsx 5005 行的实际模式)
function REPL() {
  const [messages, setMessages] = useState<Message[]>([])

  const handleSubmit = async (input: string) => {
    setMessages(prev => [...prev, createUserMessage(input)])

    // 调 agent 循环
    const engine = new QueryEngine(...)
    for await (const event of engine.ask(messages)) {
      switch (event.type) {
        case 'content_delta':
          // 流式更新最后一条 assistant message
          setMessages(prev => updateLastAssistant(prev, event.delta))
          break
        case 'tool_use':
          setMessages(prev => [...prev, event.toolUseMessage])
          break
        case 'tool_result':
          setMessages(prev => [...prev, event.resultMessage])
          break
        // ...
      }
    }
  }

  return <Messages messages={messages} />
}

关键模式: - for await 在事件循环回调里 —— React 18 自动 batching - 每个 event 触发 setState —— 流式更新 - 循环结束 = 对话完成

4.2 中途取消(AbortController 配合)

// 简化版
async function cancelableQuery() {
  const controller = new AbortController()
  const gen = query(messages, systemPrompt, context, canUseTool, {
    signal: controller.signal,
  })

  // 用户按 Ctrl+C → controller.abort()
  setTimeout(() => controller.abort(), 30000)

  try {
    for await (const event of gen) {
      if (event.type === 'content_delta') {
        // 检查 signal
        if (controller.signal.aborted) break
        yield event
      }
    }
  } finally {
    // 清理
  }
}

两个层次的取消: 1. controller.abort() —— 信号传到底层,HTTP 请求中断subprocess kill 2. for await...break —— 只停止消费,不立即终止生成器(等下一个 yield 检查)

Claude Code 两者都用:AbortController 用于硬中断,break 用于软取消。

5. 性能模式

5.1 Module-level 缓存

async function* 本身不能缓存(每次调用都新建),但包装它的函数可以

// 简化版 Markdown.tsx 的 token cache
const tokenCache = new Map<string, Token[]>()

async function* parseMarkdown(text: string): AsyncGenerator<Token> {
  if (tokenCache.has(text)) {
    for (const token of tokenCache.get(text)!) yield token
    return
  }
  // ... 实际解析 + 缓存
}

为什么用 module-level Map 而不是 useMemo: - useMemo 在 React unmount 时丢失 - async function* 的生命周期独立于 React - 跨组件、跨 session 的缓存只能放 module-level

5.2 Backpressure 利用

// 慢消费者
for await (const event of fastProducer()) {
  await renderSlowly(event)  // 渲染比生产慢
  // ↑ producer 会在 yield 后阻塞,等下一次 next() 调用
  // ↑ 这就是天然的 backpressure
}

前端类比:和 Web Streams API 的 ReadableStream 是同种"消费者慢 → 生产者暂停"机制。

5.3 Lazy iteration

// ❌ 错的:先全部生产再消费
const events = []
for await (const e of gen) events.push(e)  // 内存爆炸
for (const e of events) handle(e)

// ✅ 对的:流式生产-消费
for await (const e of gen) {
  handle(e)  // 一次只处理一个
}

6. 反模式(Claude Code 怎么避免)

6.1 不要"先 await 再 yield"

// ❌ 错的:先全部 await,再 yield(丢失流式)
async function* bad() {
  const all = await fetchAll()  // 一次性
  for (const item of all) yield item  // 已经不是流了
}

// ✅ 对的:边 await 边 yield
async function* good() {
  const stream = getStream()
  for await (const item of stream) {
    yield item  // 真正流式
  }
}

6.2 不要在生成器外共享可变状态

// ❌ 错的:生成器外状态被并发消费者影响
let counter = 0
async function* bad() {
  counter++  // 多消费者并发调用会互相影响
  yield counter
}

// ✅ 对的:状态封装在生成器内
async function* good(id: string) {
  const counter = { local: 0 }
  counter.local++
  yield counter.local
}

6.3 不要忘记 finally

// ❌ 错的:消费者 break 时资源泄露
async function* bad(path: string) {
  const handle = await openFile(path)
  for await (const chunk of handle) yield chunk
  // 没人 close!
}

// ✅ 对的:finally 兜底
async function* good(path: string) {
  const handle = await openFile(path)
  try {
    for await (const chunk of handle) yield chunk
  } finally {
    await handle.close()
  }
}

7. 关键洞察

7.1 整个项目的"心脏"

async function* 同时被 query、QueryEngine、tool.call、API client、MCP transport 使用 —— 这是项目的"惯用法"。理解了它,Claude Code 60% 的核心代码就能读懂。

7.2 "async function* + 透传" 是组合性的

每一层都是 async generator,消费者可以插入任意中间层(添加 retry、metrics、过滤)。这是函数式组合面向对象继承的根本差异 —— 选前者更灵活。

7.3 "yield 当事件 + return 当结果" 的统一性

Claude Code 的所有"长流程"都遵循这个模式: - 多次 yield = 进度/中间状态 - 最后 return = 最终结果 - throw = 错误终止

前端类比:和 React Suspense 的"throw promise"是同种"lazy evaluation"思路的兄弟。

7.4 "backpressure 来自事件循环"

不需要复杂的流控库 —— JavaScript 单线程 + 事件循环 天然 backpressure。这是 async function* 比 callback 更优雅的关键。

8. 阅读清单

  1. src/query.ts:219-280 —— query() 主循环
  2. src/QueryEngine.ts:184-280 —— class 状态机
  3. src/services/api/claude.ts:1-80 —— SDK 包装
  4. src/Tool.ts:783-792 —— buildTool 工厂(推测 call() 签名)
  5. 📌 src/services/mcp/MCPConnectionManager.tsx —— 看 MCP 怎么用 async generator*
  6. 📌 src/utils/attachments.ts:3997 行 —— 附件处理的最大生成器用法

9. 练习任务

  1. 手写一个 async generator 包装一个 5 秒的 setTimeout 序列,验证 for-await break 能取消
  2. 实现一个 LRU token cache 包在 async generator 外面
  3. 测试 backpressure:写两个 for-await 循环,一个快一个慢,观察慢消费者的内存增长
  4. 思考:如果 Claude Code 改用 RxJS,哪些代码会变短?哪些会变长?项目作者为什么选 async function*?