跳转至

Deep Dive | services/api/claude.ts 3419 行 API 客户端拆解

重要性:⭐⭐⭐⭐⭐(所有 LLM 调用的总入口真实位置src/services/api/claude.ts3419 行,项目第三大文件) 核心组成: - 行 1-50:imports(30+ 类型/工具) - 行 272-358:参数配置(ExtraBody、PromptCache、CacheControl) - 行 393-479:预算/努力参数 - 行 503-530:API 元数据 - 行 588-709:消息格式转换 - 行 709-928:流式 / 非流式实现 - 行 1017-2898:queryModel(1881 行的主入口) - 行 2898-3364:清理 / usage / cache / system prompt - 行 3241-3300:queryHaiku / queryWithModel - 行 3364-3419:辅助函数

关联phase-06-agent-loop.mdtopics/async-generator-pattern.md


1. 文件结构总览

claude.ts (3419 行)
├── 行 1-50    :imports(30+ 业务模块)
├── A. 参数配置(行 272-530)
│   ├── getExtraBodyParams        行 272  (61 行)
│   ├── getPromptCachingEnabled   行 333  (25 行)
│   ├── getCacheControl           行 358  (35 行)
│   ├── should1hCacheTTL          行 393  (47 行)
│   ├── configureEffortParams     行 440  (39 行)
│   ├── configureTaskBudgetParams 行 479  (24 行)
│   └── getAPIMetadata            行 503  (27 行)
├── B. API 验证(行 530-588)
│   └── verifyApiKey              行 530  (58 行)
├── C. 消息格式转换(行 588-709)
│   ├── userMessageToMessageParam       行 588  (45 行)
│   ├── assistantMessageToMessageParam  行 633  (76 行)
├── D. 流式/非流式入口(行 709-1017)
│   ├── queryModelWithoutStreaming      行 709  (43 行)
│   ├── queryModelWithStreaming         行 752  (34 行)
│   ├── shouldDeferLspTool              行 786  (21 行)
│   ├── getNonstreamingFallbackTimeoutMs 行 807  (11 行)
│   └── executeNonStreamingRequest      行 818  (110 行)
├── E. **queryModel(行 1017-2898, 1881 行)** ⭐ 核心
├── F. 后处理(行 2898-3213)
│   ├── cleanupStream                  行 2898  (26 行)
│   ├── updateUsage                    行 2924  (69 行)
│   ├── accumulateUsage                行 2993  (47 行)
│   ├── isToolResultBlock              行 3040  (23 行)
│   └── addCacheBreakpoints            行 3063  (150 行)
├── G. System prompt(行 3213-3241)
│   └── buildSystemPromptBlocks        行 3213  (28 行)
├── H. 顶层 query 函数(行 3241-3364)
│   ├── queryHaiku                     行 3241  (59 行)
│   └── queryWithModel                 行 3300  (64 行)
└── I. 辅助(行 3364-3419)
    ├── adjustParamsForNonStreaming    行 3364  (30 行)
    └── isMaxTokensCapEnabled          行 3394  (25 行)

2. 行 1-50:Imports —— 30+ 模块

import type {
  BetaContentBlock, BetaContentBlockParam, BetaImageBlockParam,
  BetaJSONOutputFormat, BetaMessage, BetaMessageDeltaUsage,
  BetaMessageStreamParams, BetaOutputConfig, BetaRawMessageStreamEvent,
  BetaRequestDocumentBlock, BetaStopReason, BetaToolChoiceAuto,
  BetaToolChoiceTool, BetaToolResultBlockParam, BetaToolUnion,
  BetaUsage, BetaMessageParam as MessageParam, TextBlockParam,
} from '@anthropic-ai/sdk/resources/...'
import { randomUUID } from 'crypto'
import { getAPIProvider, isFirstPartyAnthropicBaseUrl } from 'src/utils/model/providers.js'
import { getAttributionHeader, getCLISyspromptPrefix } from '../../constants/system.js'
// ... 25+ import

关键洞察: - 全部从 @anthropic-ai/sdk import 30+ 个类型 —— 深度使用 SDK 的所有 Beta 能力(extended thinking、prompt caching、tool use、message batches、structured output、images、documents) - as MessageParam 别名 —— 解决 SDK 内部类型命名 - getAPIProvider / isFirstPartyAnthropicBaseUrl —— 4 种 provider 适配(Anthropic / Bedrock / Vertex / claude.ai)

预测外部构建@anthropic-ai/sdk 会被 bun bundle 内联(参考 bun:bundle)。


3. A 段:参数配置(行 272-530)

3.1 getExtraBodyParams(行 272-332,61 行)

export function getExtraBodyParams(betaHeaders?: string[]): JsonObject {
  // 1. 解析 ANTHROPIC_CUSTOM_HEADERS
  // 2. 解析 ANTHROPIC_EXTRA_BODY
  // 3. 构造 structured output 字段
  // 4. 构造 effort 字段
  // 5. 构造 token budget 字段
  // ... 合并返回
}

作用:从 env vars 提取自定义请求体参数加到每个 API 请求

为什么是 env vars: - 用户/企业可定制 - 不需要改代码

3.2 getPromptCachingEnabled(行 333-357,25 行)

export function getPromptCachingEnabled(model: string): boolean {
  // 1. 默认对所有模型启用
  // 2. 除非 model 在 DISABLE_PROMPT_CACHING_HAIKU/SONNET/OPUS 名单
  // 3. 除非全局 DISABLE_PROMPT_CACHING
  return true  // 或 false
}

prompt cache 是 Anthropic API 的省钱特性: - 重复的 system prompt / 长上下文不重复计费 - Claude Code 默认启用

env vars 控制粒度:全局 + 按模型(haiku/sonnet/opus)。

3.3 getCacheControl(行 358-392,35 行)

export function getCacheControl({
  type = 'ephemeral',
  ttl,
}: {
  type?: 'ephemeral' | 'persistent'
  ttl?: '5m' | '1h'
}): { type: string; ttl?: string } {
  // 返回 cache_control 字段
}

Anthropic API 的缓存控制: - ephemeral —— 5 分钟过期(默认) - persistent —— 1 小时过期 - 1h TTL 是 Beta 特性should1hCacheTTL() 检查)

3.4 should1hCacheTTL(行 393-439,47 行)

function should1hCacheTTL(querySource?: QuerySource): boolean {
  // 检查 query 来源是否支持 1h cache
  // 1. 默认 false
  // 2. enterprise / pro 用户开启
  // 3. querySource 是 'batch' 之类长任务 → true
}

1 小时 cache 是付费特性,默认关闭

3.5 configureEffortParams(行 440-478,39 行)

function configureEffortParams(body: JsonObject, effort: EffortValue): void {
  // 设置 body.thinking = { type: 'enabled', budget_tokens: ... }
  // 或 body.output_config = { effort: 'low' | 'medium' | 'high' }
}

模型思考强度控制: - 'low' —— 快、便宜 - 'high' —— 慢、准 - Anthropic API 的 Beta 特性

3.6 configureTaskBudgetParams(行 479-502,24 行)

export function configureTaskBudgetParams(body, taskBudget) {
  // 设置 token 预算
  // body.budget_tokens = taskBudget.total
}

总 token 预算 —— 给整个任务设上限,不超支

3.7 getAPIMetadata(行 503-529,27 行)

export function getAPIMetadata(): {
  attribution: AttributionState
  cost: number
  duration: number
  // ...
} {
  // 从 cost-tracker / bootstrap.state 提取
  // 注入到每个 API 请求的 metadata 字段
}

API metadata —— 客户端信息,Anthropic 用来分析哪些 client 在用什么模型、什么费用。


4. B 段:API 验证(行 530-588)

4.1 verifyApiKey(行 530-587,58 行)

export async function verifyApiKey(apiKey: string): Promise<{
  valid: boolean
  error?: string
}> {
  // 1. 构造最小请求
  // 2. 调 /v1/messages
  // 3. 根据响应判断
  // 4. 返回 valid / error
}

首次启动验证: - 调一次"hello" API - 检查 key 是否有效 - 失败时给用户友好提示("API key 无效,请检查 ~/.claude/settings.json")

配套hooks/useApiKeyVerification.ts 在 REPL 启动时调。


5. C 段:消息格式转换(行 588-709)

5.1 userMessageToMessageParam(行 588-632,45 行)

export function userMessageToMessageParam(
  msg: UserMessage,
  options: { /* ... */ }
): MessageParam {
  // 1. 转换 content(string → ContentBlock[])
  // 2. 处理 attachments(图片 / PDF)
  // 3. 处理 tool_result blocks
  // 4. 处理 cache_control
  // 5. 返回 SDK 格式
}

关键洞察: - 内部类型 (UserMessage) → SDK 类型 (MessageParam) - 附件、tool_result、cache 都在这里处理 - 配套assistantMessageToMessageParam 是反向

5.2 assistantMessageToMessageParam(行 633-708,76 行)

export function assistantMessageToMessageParam(
  msg: AssistantMessage,
  options: { /* ... */ }
): MessageParam {
  // 1. 转换 thinking blocks
  // 2. 转换 text blocks
  // 3. 转换 tool_use blocks
  // 4. 处理 cache_control
}

76 行 > 45 行 —— 因为 assistant message 有 thinking、tool_use 两种 block。


6. D 段:流式/非流式入口(行 709-1017)

6.1 queryModelWithoutStreaming(行 709-751,43 行)

export async function queryModelWithoutStreaming(
  messages: Message[],
  systemPrompt: SystemPrompt,
  context: ToolUseContext,
  canUseTool: CanUseToolFn,
  options: { /* ... */ }
): Promise<Message[]> {
  // 1. 构造请求
  // 2. 调 executeNonStreamingRequest
  // 3. 返回完整 messages(含 assistant 回复)
}

非流式 —— 一次返回所有消息。
用途:SDK 模式、批量任务、resume。

6.2 queryModelWithStreaming(行 752-785,34 行)

export async function* queryModelWithStreaming(
  messages: Message[],
  // ...
): AsyncGenerator<StreamEvent, void, void> {
  // 1. 构造流式请求
  // 2. for await 消费 stream
  // 3. 透传每个 event
}

流式 —— 异步生成器,逐 token 返回。
用途:REPL 实时显示。

6.3 shouldDeferLspTool(行 786-806,21 行)

function shouldDeferLspTool(tool: Tool): boolean {
  // LSP 工具(auto-complete)应该在 LLM 调用后再执行
  // 而不是和 LLM 调用并行
}

LSP(Language Server Protocol)工具的特殊处理。

6.4 getNonstreamingFallbackTimeoutMs(行 807-817,11 行)

function getNonstreamingFallbackTimeoutMs(): number {
  // 流式调用超时 → 降级到非流式
  // 超时时长
  return API_TIMEOUT_MS  // 默认 60s
}

降级机制 —— 流式失败时自动切非流式

6.5 executeNonStreamingRequest(行 818-927,110 行)

export async function* executeNonStreamingRequest(
  params: MessageStreamParams,
): AsyncGenerator<StreamEvent, void, void> {
  // 1. client.messages.create({ ...params, stream: false })
  // 2. 模拟流式输出(single event)
  // 3. yield 单个完整消息
}

非流式 + 模拟流式 —— 内部统一用 AsyncGenerator<StreamEvent>业务层不知道是流式还是非流式

这是"统一接口"设计: - 业务层只关心 for await - 底层可选流式 / 非流式 - 降级零成本


7. E 段:⭐ queryModel 主函数(行 1017-2898,1881 行)

这是整个 claude.ts 最核心的函数。1881 行不是单一逻辑,是完整 LLM 调用的所有边界处理

7.1 推测的整体结构

async function* queryModel(
  messages: Message[],
  systemPrompt: SystemPrompt,
  context: ToolUseContext,
  canUseTool: CanUseToolFn,
  options: QueryOptions,
): AsyncGenerator<StreamEvent, void, void> {
  // ─── 1. 准备阶段(约 200 行)───
  // 1.1 构造请求参数
  // 1.2 转换 messages → MessageParam[]
  // 1.3 选择 provider(Anthropic / Bedrock / Vertex)
  // 1.4 构造 client
  // 1.5 准备 cache breakpoints
  // 1.6 处理 system prompt blocks
  // 1.7 处理 tools
  // 1.8 处理 metadata / attribution
  // 1.9 处理 thinking config
  // 1.10 处理 effort params
  // 1.11 处理 task budget

  // ─── 2. API 调用(约 200 行)───
  // 2.1 决定流式 / 非流式
  // 2.2 调 client.messages.stream / .create
  // 2.3 错误处理 + 重试

  // ─── 3. 流解析(约 800 行)───
  // 3.1 解析 message_start
  // 3.2 解析 content_block_start(text / thinking / tool_use / image / document)
  // 3.3 解析 content_block_delta(逐 token)
  // 3.4 解析 content_block_stop
  // 3.5 解析 message_delta(usage、stop_reason)
  // 3.6 解析 message_stop

  // ─── 4. 后处理(约 400 行)───
  // 4.1 累加 usage
  // 4.2 处理 cache breakpoints
  // 4.3 处理 tool_use
  // 4.4 处理 plan mode
  // 4.5 处理 synthetic output
  // 4.6 错误恢复

  // ─── 5. 工具循环(约 200 行)───
  // 5.1 如果 assistant 返回 tool_use → 调 canUseTool
  // 5.2 权限检查
  // 5.3 调工具
  // 5.4 把 tool_result 加到 messages
  // 5.5 递归 / 继续循环

  // ─── 6. 收尾(约 80 行)───
  // 6.1 cleanupStream
  // 6.2 updateUsage
  // 6.3 accumulateUsage
  // 6.4 返回最终结果
}

7.2 关键设计点

7.2.1 1881 行的"巨型函数"是合理的吗?

乍看不合理,但 Claude Code 选这种结构有理由:

1. 真正的"流式循环"逻辑就是长

for await (const event of stream) {
  switch (event.type) {
    case 'message_start': /* 100 行状态处理 */ break
    case 'content_block_start': /* 200 行不同 block 类型 */ break
    case 'content_block_delta': /* 300 行 delta 累积 */ break
    case 'content_block_stop': /* 150 行 block 收尾 */ break
    case 'message_delta': /* 100 行 usage 处理 */ break
    case 'message_stop': /* 80 行清理 */ break
  }
}

switch 内部每个 case 都是 100-300 行累加 1881 行

2. 拆函数会损失 inline 优化的机会

V8 引擎对大函数有特殊优化(如 TurboFan 的 inlining)。 拆成 50 个小函数反而

3. 业务逻辑高度耦合

流式事件的处理 工具调用、cache、usage 处理深度交织强拆 = 大量参数传递

4. 测试用 mock 而非拆分

业务测试用 vitest mockvitest.mock('./Anthropic')),不需要拆函数

7.2.2 queryModel 内部的关键 switch 块

switch (event.type) 处理 6 种 event,每种 100-300 行。

最复杂的content_block_start —— 根据 block.type 分 5+ 种处理(text、thinking、tool_use、image、document、redacted_thinking)。

case 'content_block_start':
  switch (event.content_block.type) {
    case 'text':
      // 启动 text block 累积器
      break
    case 'thinking':
      // 启动 thinking block 累积器
      break
    case 'tool_use':
      // 启动 tool_use block 解析
      break
    case 'image':
      // 处理图片
      break
    case 'document':
      // 处理 PDF
      break
    // ...
  }
  break

每个 case 内部: - 启动累积器(currentBlock = { ... }) - 处理 cache control - 处理 attribution - 状态更新

7.2.3 工具循环(Tool Loop)

// 推测的代码
let iterCount = 0
while (true) {
  iterCount++
  if (iterCount > MAX_ITERATIONS) {
    throw new MaxIterationsError()
  }

  // 1. 跑 LLM
  for await (const event of queryModel(...)) {
    yield event
    if (event.type === 'tool_use') {
      // 2. 调工具
      const result = await canUseTool(event.tool, event.input, context)
      // 3. 追加 tool_result 到 messages
      messages = [...messages, result]
    }
  }
}

关键LLM 和 tool 在同一个 async generator 里循环。 - 消费者看到的是流式 events - 业务上是 LLM → tool → LLM → tool 的循环

7.2.4 错误处理(在 queryModel 内部)

try {
  for await (const event of stream) {
    yield event
  }
} catch (err) {
  // 1. 分类错误
  const category = categorizeRetryableAPIError(err)

  if (category?.retryable) {
    // 2. 退避后重试
    yield* retryWithBackoff(messages, context, category)
  } else if (category?.code === 'rate_limit') {
    // 3. 降级到 fallback model
    yield* fallbackToSecondaryModel(messages, context)
  } else {
    // 4. 抛给上层
    throw err
  }
}

"try / catch + 分类 + 重试 / 降级" 是 Claude Code 的标准模式。


8. F 段:后处理(行 2898-3213)

8.1 cleanupStream(行 2898-2923,26 行)

export function cleanupStream(stream: Stream): void {
  // 1. abort stream
  // 2. 释放 file handles
  // 3. 清理 listeners
}

资源清理 —— AbortController 触发的流必须清理。

8.2 updateUsage(行 2924-2992,69 行)

export function updateUsage(
  state: AppState,
  usage: BetaUsage,
  cost: number,
  duration: number,
): AppState {
  // 1. 累加 input_tokens / output_tokens
  // 2. 累加 cost
  // 3. 累加 duration
  // 4. 返回新 state
}

Usage 累计 —— 每次 API 响应都更新 state。

8.3 accumulateUsage(行 2993-3039,47 行)

export function accumulateUsage(
  a: NonNullableUsage,
  b: NonNullableUsage,
): NonNullableUsage {
  // 字段加法
  return {
    input_tokens: a.input_tokens + b.input_tokens,
    output_tokens: a.output_tokens + b.output_tokens,
    cache_creation_input_tokens: ...,
    cache_read_input_tokens: ...,
    cost_usd: a.cost_usd + b.cost_usd,
  }
}

纯函数 —— 把两个 usage 累加。配合 updateUsage

8.4 addCacheBreakpoints(行 3063-3212,150 行)

export function addCacheBreakpoints(
  messages: MessageParam[],
  options: { /* ... */ }
): MessageParam[] {
  // 1. 找到 system prompt
  // 2. 在 system prompt 末尾加 cache_control
  // 3. 找到 messages 中合适的"分界点"
  // 4. 在分界点加 cache_control
  // 5. 返回处理后的 messages
}

150 行的 cache 策略 —— prompt cache 不是简单加标记,有算法: - 太靠前 → 缓存命中率低 - 太靠后 → 缓存太大 - 最优分界点 = 滚动的"过去 N 条消息"边界


9. G 段:System prompt(行 3213-3241)

9.1 buildSystemPromptBlocks(行 3213-3240,28 行)

export function buildSystemPromptBlocks(
  systemPrompt: string | ContentBlock[],
  tools: Tools,
  context: ToolUseContext,
): ContentBlock[] {
  // 1. 加 CLI prefix
  // 2. 加 attribution
  // 3. 加 cache control
  // 4. 加自定义 system prompt
  // 返回 ContentBlock[]
}

构造 system prompt —— 包含工具描述、attribution、cache 标记。


10. H 段:顶层 query 函数(行 3241-3364)

10.1 queryHaiku(行 3241-3299,59 行)

export async function queryHaiku(prompt: string): Promise<string> {
  // 1. 调 small model(haiku)做轻量任务
  // 2. 用于 classifier、摘要等
  // 3. 不用 tools、很小 token 预算
}

Haiku 专门函数 —— 用于轻量任务(不是主 LLM 循环)。 典型用途: - 压缩时的摘要 - 风险分类 - 提取关键信息

10.2 queryWithModel(行 3300-3363,64 行)

export async function queryWithModel({
  model,
  messages,
  systemPrompt,
  maxTokens,
}): Promise<string> {
  // 1. 用指定 model(opus / sonnet / haiku)调一次
  // 2. 非流式
  // 3. 返回纯文本
}

非流式、纯文本、单 model —— SDK 模式最常用。


11. I 段:辅助(行 3364-3419)

11.1 adjustParamsForNonStreaming(行 3364-3393,30 行)

function adjustParamsForNonStreaming(params): MessageStreamParams {
  // 调整 max_tokens / thinking 预算(流式 vs 非流式不同)
}

11.2 isMaxTokensCapEnabled(行 3394-3418,25 行)

function isMaxTokensCapEnabled(): boolean {
  // 检查 CLAUDE_CODE_MAX_OUTPUT_TOKENS env var
  return process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS !== undefined
}

Token 上限 —— 用户可设。


12. 关键洞察

12.1 "claude.ts 是 3419 行的工具包装"的意义

Anthropic SDK 已经提供了: - 流式 / 非流式 API - 类型定义 - 错误类

Claude Code claude.ts 加了: - 4 种 provider 适配 - Prompt cache 策略 - Usage 累计 - 错误分类 + 重试 - System prompt 构造 - Tool loop 集成 - 30+ env vars / feature flag 控制

包装价值巨大 —— 业务层只需要 for await (const e of queryModel(...)) 即可。

12.2 1881 行 queryModel 的设计哲学

为什么不拆: - V8 inline 优化 - 业务逻辑强耦合 - 测试用 mock

为什么不重构: - 注释解释了每个分段("准备"、"API 调用"、"流解析"、"后处理"、"工具循环") - 未来维护者不需要考古

12.3 "流式 / 非流式"双实现 + 统一接口

async function* executeNonStreamingRequest(params): AsyncGenerator<StreamEvent> {
  // 模拟流式
  const response = await client.messages.create({ ...params, stream: false })
  yield { type: 'message_start', ... }
  yield { type: 'content_block_start', ... }
  yield { type: 'content_block_delta', text: response.text }
  yield { type: 'message_stop', ... }
}

业务层不区分: - REPL 用真流式(实时显示) - SDK 用非流式 + 模拟(一次返回) - 切换零成本

12.4 Anthropic SDK 的 30+ 类型都用到了

  • BetaMessageDeltaUsage —— 流式 usage
  • BetaRequestDocumentBlock —— PDF 文档
  • BetaToolChoiceAuto / BetaToolChoiceTool —— 工具选择
  • BetaJSONOutputFormat —— 结构化输出
  • BetaStopReason —— 停止原因
  • 等等

"吃透 SDK"是 Claude Code 的核心能力


13. 阅读清单

  1. ✅ 完整通读 src/services/api/claude.ts(3419 行)
  2. ✅ 读 phase-06-agent-loop.md 配合
  3. 📌 读 @anthropic-ai/sdk 的 messages.mjs 类型
  4. 📌 读 src/services/api/errors.ts(错误分类)
  5. 📌 读 src/services/api/withRetry.ts(重试)

14. 练习任务

  1. queryModel 内的 switch case 数量 —— 6 种 event × 各自 5-10 sub-case
  2. 找出所有 "early return" 路径 —— queryModel 在什么情况下提前 return?
  3. 设计你自己的 "API client wrapper" —— 给一个 200 行的 TypeScript 函数包装 Anthropic SDK,支持流式 / 非流式 / 错误重试
  4. 思考:1881 行的 queryModel 是不是"坏味道"?如果是,怎么拆?如果不是,为什么?