跳转至

Walkthrough | 构建 Async Stream

难度:⭐⭐ 时间:~1h 目标:手写一个 async stream utility,类似 Anthropic SDK


1. Async Stream 是什么

Async Stream = 异步 + 流式 + 可消费。 - 模拟 Anthropic SDK 的 streamMessage - 处理 SSE - 提供 helper


2. 目标

手写一个 MiniAnthropic 类: - messages.create() 同步 - messages.stream() 异步流 - 文本聚合 helper - 错误处理


3. 完整代码

// mini-anthropic.ts

type Message = {
  role: 'user' | 'assistant' | 'system'
  content: string
}

type Usage = {
  input_tokens: number
  output_tokens: number
}

type Response = {
  id: string
  content: Array<{ type: 'text'; text: string }>
  usage: Usage
  stop_reason: 'end_turn' | 'max_tokens' | 'stop_sequence'
  model: string
}

type StreamEvent =
  | { type: 'message_start'; message: Response }
  | { type: 'content_block_start'; index: number; content_block: any }
  | { type: 'content_block_delta'; index: number; delta: { type: 'text_delta'; text: string } }
  | { type: 'content_block_stop'; index: number }
  | { type: 'message_delta'; delta: { stop_reason: string } }
  | { type: 'message_stop' }

class AnthropicError extends Error {
  constructor(public status: number, message: string) {
    super(message)
    this.name = 'AnthropicError'
  }
}

export class MiniAnthropic {
  private apiKey: string
  private baseURL: string

  constructor(apiKey: string, baseURL = 'https://api.anthropic.com') {
    this.apiKey = apiKey
    this.baseURL = baseURL
  }

  messages = {
    create: async (params: {
      model: string
      max_tokens: number
      messages: Message[]
      system?: string
    }): Promise<Response> => {
      const response = await fetch(`${this.baseURL}/v1/messages`, {
        method: 'POST',
        headers: this.headers(false),
        body: JSON.stringify(params),
      })

      if (!response.ok) {
        const error = await response.text()
        throw new AnthropicError(response.status, error)
      }

      return await response.json()
    },

    stream: async function* (
      this: MiniAnthropic,
      params: any,
    ): AsyncGenerator<StreamEvent> {
      const response = await fetch(`${this.baseURL}/v1/messages`, {
        method: 'POST',
        headers: this.headers(true),
        body: JSON.stringify({ ...params, stream: true }),
      })

      if (!response.ok) {
        throw new AnthropicError(response.status, await response.text())
      }

      const reader = response.body!.getReader()
      const decoder = new TextDecoder()
      let buffer = ''

      while (true) {
        const { done, value } = await reader.read()
        if (done) break

        buffer += decoder.decode(value, { stream: true })

        // SSE: events separated by \n\n
        const events = buffer.split('\n\n')
        buffer = events.pop() || ''

        for (const event of events) {
          // Parse "event: type\ndata: json"
          const lines = event.split('\n')
          let eventType = ''
          let dataLine = ''

          for (const line of lines) {
            if (line.startsWith('event: ')) eventType = line.slice(7)
            if (line.startsWith('data: ')) dataLine = line.slice(6)
          }

          if (eventType && dataLine && dataLine !== '[DONE]') {
            try {
              yield JSON.parse(dataLine)
            } catch (e) {
              // skip
            }
          }
        }
      }
    },
  }

  // 5. helper: 收集文本
  async collectText(stream: AsyncGenerator<StreamEvent>): Promise<{
    text: string
    usage: Usage | null
  }> {
    let text = ''
    let usage: Usage | null = null

    for await (const event of stream) {
      if (event.type === 'content_block_delta' && event.delta?.type === 'text_delta') {
        text += event.delta.text
      }
      if (event.type === 'message_delta') {
        // 推测:包含 usage
      }
      if (event.type === 'message_start') {
        usage = event.message.usage
      }
    }

    return { text, usage }
  }

  // 6. helper: with retry
  async withRetry<T>(fn: () => Promise<T>, maxRetries = 3): Promise<T> {
    let lastError: Error | null = null
    for (let i = 0; i < maxRetries; i++) {
      try {
        return await fn()
      } catch (e) {
        lastError = e as Error
        if (e instanceof AnthropicError && e.status < 500 && e.status !== 429) {
          throw e  // 不 retry 4xx (除 429)
        }
        await new Promise((r) => setTimeout(r, 1000 * 2 ** i))
      }
    }
    throw lastError
  }

  private headers(stream: boolean): Record<string, string> {
    return {
      'Content-Type': 'application/json',
      'x-api-key': this.apiKey,
      'anthropic-version': '2023-06-01',
      ...(stream ? { 'Accept': 'text/event-stream' } : {}),
    }
  }
}

~150 行


4. 使用示例

4.1 同步调用

const client = new MiniAnthropic(process.env.ANTHROPIC_API_KEY!)

const response = await client.messages.create({
  model: 'claude-sonnet-4-6',
  max_tokens: 1024,
  messages: [{ role: 'user', content: 'Hello' }],
})

console.log(response.content[0].text)

简单

4.2 流式 + 文本收集

const stream = client.messages.stream({
  model: 'claude-sonnet-4-6',
  max_tokens: 1024,
  messages: [{ role: 'user', content: 'Tell a story' }],
})

const { text, usage } = await client.collectText(stream)
console.log('Text:', text)
console.log('Tokens:', usage)

流 + 收集

4.3 实时流

for await (const event of client.messages.stream(params)) {
  if (event.type === 'content_block_delta') {
    process.stdout.write(event.delta.text)
  }
}

实时

4.4 重试

const response = await client.withRetry(() =>
  client.messages.create(params)
)

retry


5. 5 个关键设计

5.1 Async Iterator

async function* stream() { ... }

async function*。

5.2 SSE 解析

// event: type\ndata: json\n\n

SSE 标准

5.3 收集 helper

async collectText(stream) { ... }

helper

5.4 Retry with backoff

await new Promise((r) => setTimeout(r, 1000 * 2 ** i))

exponential

5.5 4xx 不 retry

if (e.status < 500 && e.status !== 429) throw e

smart retry


6. 5 个扩展

6.1 Token 累加

class TokenCounter {
  private total: Usage = { input_tokens: 0, output_tokens: 0 }

  add(usage: Usage) {
    this.total.input_tokens += usage.input_tokens
    this.total.output_tokens += usage.output_tokens
  }

  get() { return this.total }
  getCost(model: string) { /* USD */ }
}

counter

6.2 Cost 计算

const PRICING = {
  'claude-sonnet-4-6': { in: 3, out: 15 },  // per 1M
}

function cost(model: string, usage: Usage): number {
  const p = PRICING[model]
  return (usage.input_tokens * p.in + usage.output_tokens * p.out) / 1_000_000
}

cost

6.3 多 API key rotation

class ClientWithRotation {
  private keys: string[] = []
  private idx = 0

  rotate() {
    this.idx = (this.idx + 1) % this.keys.length
  }
}

rotation

6.4 Streaming cancel

async function* streamWithCancel(source, signal) {
  for await (const event of source) {
    if (signal.aborted) return
    yield event
  }
}

cancel

6.5 Custom retry policy

interface RetryPolicy {
  shouldRetry(error: Error, attempt: number): boolean
  delay(attempt: number): number
}

policy


7. 5 个关键洞察

  1. async function* = 流
  2. SSE 解析 = buffer split
  3. collectText = helper 简化
  4. Retry with backoff = 必备
  5. 4xx 不 retry = 智能

8. 对比真实 SDK

维度 Mini 真实
行数 150 3419
Stream 基础 完整
Retry 简单 jitter + policy
缓存 自动
多 key rotation
Beta 完整

简化 vs 真实


9. 5 个练习

  1. 加 token 累加 —— TokenCounter class
  2. 加 cost 计算 —— PRICING dict
  3. 加 streaming cancel —— AbortController
  4. 加 custom retry policy —— interface
  5. 加多 key rotation —— rotate on 429

5 步


10. 总结

构建 Async Stream = async function* + SSE + helper

核心: - 150 行简化版 - stream + create - collectText helper - retry with backoff

下一步: - 用在 QueryEngine - 加 token 累加 - 加 cancel