跳转至

Walkthrough | 手写 Async Generator

难度:⭐⭐⭐ 时间:~1h 目标:理解 async generator 模式,实现一个简单的消息流


1. Async Generator 是什么

Async Generator = TypeScript 的 async function* + yield。 - 异步 + 流式 - 懒求值 - 可取消 - backpressure 友好

Claude Code 用例: - API 流式响应 - Bash 输出 - 进度通知


2. 目标

手写一个简化版 async generator: - 模拟 API 流式响应 - 处理 cancel - 处理 backpressure - 错误处理


3. 完整代码

// mini-async-stream.ts

// 1. 简单流
export async function* simpleStream(items: string[]) {
  for (const item of items) {
    await new Promise((r) => setTimeout(r, 100))  // 模拟延迟
    yield item
  }
}

// 2. 带 cancel
export async function* streamWithCancel(
  items: string[],
  signal: AbortSignal,
) {
  for (const item of items) {
    if (signal.aborted) {
      console.log('cancelled')
      return
    }
    await new Promise((r) => setTimeout(r, 100))
    yield item
  }
}

// 3. 模拟 Claude API 流
export async function* mockClaudeStream(prompt: string): AsyncGenerator<{
  type: 'text' | 'done'
  content: string
}> {
  // 模拟响应
  const response = `Response to: ${prompt}`

  for (const char of response) {
    await new Promise((r) => setTimeout(r, 20))
    yield { type: 'text', content: char }
  }

  yield { type: 'done', content: '' }
}

// 4. 带错误
export async function* streamWithError(): AsyncGenerator<any> {
  try {
    yield 'a'
    yield 'b'
    throw new Error('mid-stream error')
  } catch (e) {
    yield { type: 'error', error: e }
  }
}

// 5. Backpressure 友好
export async function* backpressureStream(
  source: AsyncIterable<number>,
  process: (n: number) => Promise<void>,
): AsyncGenerator<void> {
  for await (const item of source) {
    await process(item)  // 等待消费者就绪
    yield  // 让出控制
  }
}

// 6. 实际用法
async function main() {
  console.log('1. Simple stream:')
  for await (const item of simpleStream(['a', 'b', 'c'])) {
    console.log(item)
  }

  console.log('\n2. With cancel:')
  const controller = new AbortController()
  setTimeout(() => controller.abort(), 150)

  try {
    for await (const item of streamWithCancel(
      ['a', 'b', 'c', 'd', 'e'],
      controller.signal,
    )) {
      console.log(item)
    }
  } catch (e) {
    // ignore
  }

  console.log('\n3. Mock Claude:')
  for await (const event of mockClaudeStream('Hello')) {
    if (event.type === 'text') process.stdout.write(event.content)
    if (event.type === 'done') console.log()
  }
}

main()

~100 行


4. Async Generator 基础

4.1 声明

async function* gen() {
  yield 1
  yield 2
  yield 3
}

async function*

4.2 使用

for await (const item of gen()) {
  console.log(item)
}

for await...of

4.3 取消

const controller = new AbortController()
setTimeout(() => controller.abort(), 1000)
for await (const item of gen(controller.signal)) {
  // 会被取消
}

AbortController

4.4 错误处理

try {
  for await (const item of gen()) {
    // ...
  }
} catch (e) {
  console.error(e)
}

try/catch


5. Claude API 流式示例

import Anthropic from '@anthropic-ai/sdk'

const client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY! })

async function streamConversation(prompt: string) {
  const stream = await client.messages.create({
    model: 'claude-sonnet-4-6',
    max_tokens: 1024,
    messages: [{ role: 'user', content: prompt }],
    stream: true,
  })

  for await (const event of stream) {
    if (event.type === 'content_block_delta' && event.delta.type === 'text_delta') {
      process.stdout.write(event.delta.text)
    }
  }
  console.log()  // 换行
}

streamConversation('Tell a joke')

真实


6. 5 个 generator 模式

6.1 简单 yield

async function* gen() {
  yield 1
  yield 2
}

基础

6.2 动态 yield

async function* gen(items: any[]) {
  for (const item of items) {
    yield item
  }
}

动态

6.3 无限流

async function* infinite() {
  let i = 0
  while (true) {
    yield i++
  }
}

无限

6.4 Transform

async function* upper(source: AsyncIterable<string>) {
  for await (const s of source) {
    yield s.toUpperCase()
  }
}

transform

6.5 Merge

async function* merge<T>(...sources: AsyncIterable<T>[]) {
  // round-robin 或并发
}

merge


7. 5 个 Claude Code 用例

7.1 API 流

const stream = await client.messages.create({ ..., stream: true })
for await (const event of stream) { ... }

API

7.2 Bash 输出

const proc = spawn(command)
for await (const chunk of proc.stdout) { ... }

Bash

7.3 Progress 通知

async function* progress(total: number) {
  for (let i = 0; i < total; i++) {
    yield { progress: i, total }
    await work()
  }
}

Progress

7.4 消息流

async function* messages(turns: Turn[]) {
  for (const turn of turns) {
    yield turn.user
    yield* assistantResponse(turn)
  }
}

Messages

7.5 File 读取

async function* readLines(path: string) {
  const stream = createReadStream(path, { encoding: 'utf-8' })
  const rl = createInterface({ input: stream })
  for await (const line of rl) yield line
}

File


8. 5 个关键洞察

  1. async function* 是声明
  2. for await...of 是消费
  3. yield 是产出
  4. AbortController 是取消
  5. backpressure 自动

9. 5 个练习

  1. 加 map —— map<T, U>(source, fn)
  2. 加 filter —— filter<T>(source, pred)
  3. 加 take —— take<T>(source, n)
  4. 加 merge —— merge<T>(...sources)
  5. 加 retry —— 出错时重试

5 步


10. 总结

手写 Async Generator = 理解 async + 流式 + 懒求值

核心: - async function* 声明 - for await...of 消费 - yield 产出 - AbortController 取消 - 自动 backpressure

下一步: - 用在 Claude API 流式 - 用在 Bash 输出 - 用在 progress 通知