Deep Dive | services/api/claude.ts 3419 行 API 客户端拆解¶
重要性:⭐⭐⭐⭐⭐(所有 LLM 调用的总入口) 真实位置:
src/services/api/claude.ts(3419 行,项目第三大文件) 核心组成: - 行 1-50:imports(30+ 类型/工具) - 行 272-358:参数配置(ExtraBody、PromptCache、CacheControl) - 行 393-479:预算/努力参数 - 行 503-530:API 元数据 - 行 588-709:消息格式转换 - 行 709-928:流式 / 非流式实现 - 行 1017-2898:queryModel(1881 行的主入口) - 行 2898-3364:清理 / usage / cache / system prompt - 行 3241-3300:queryHaiku / queryWithModel - 行 3364-3419:辅助函数
1. 文件结构总览¶
claude.ts (3419 行)
│
├── 行 1-50 :imports(30+ 业务模块)
│
├── A. 参数配置(行 272-530)
│ ├── getExtraBodyParams 行 272 (61 行)
│ ├── getPromptCachingEnabled 行 333 (25 行)
│ ├── getCacheControl 行 358 (35 行)
│ ├── should1hCacheTTL 行 393 (47 行)
│ ├── configureEffortParams 行 440 (39 行)
│ ├── configureTaskBudgetParams 行 479 (24 行)
│ └── getAPIMetadata 行 503 (27 行)
│
├── B. API 验证(行 530-588)
│ └── verifyApiKey 行 530 (58 行)
│
├── C. 消息格式转换(行 588-709)
│ ├── userMessageToMessageParam 行 588 (45 行)
│ ├── assistantMessageToMessageParam 行 633 (76 行)
│
├── D. 流式/非流式入口(行 709-1017)
│ ├── queryModelWithoutStreaming 行 709 (43 行)
│ ├── queryModelWithStreaming 行 752 (34 行)
│ ├── shouldDeferLspTool 行 786 (21 行)
│ ├── getNonstreamingFallbackTimeoutMs 行 807 (11 行)
│ └── executeNonStreamingRequest 行 818 (110 行)
│
├── E. **queryModel(行 1017-2898, 1881 行)** ⭐ 核心
│
├── F. 后处理(行 2898-3213)
│ ├── cleanupStream 行 2898 (26 行)
│ ├── updateUsage 行 2924 (69 行)
│ ├── accumulateUsage 行 2993 (47 行)
│ ├── isToolResultBlock 行 3040 (23 行)
│ └── addCacheBreakpoints 行 3063 (150 行)
│
├── G. System prompt(行 3213-3241)
│ └── buildSystemPromptBlocks 行 3213 (28 行)
│
├── H. 顶层 query 函数(行 3241-3364)
│ ├── queryHaiku 行 3241 (59 行)
│ └── queryWithModel 行 3300 (64 行)
│
└── I. 辅助(行 3364-3419)
├── adjustParamsForNonStreaming 行 3364 (30 行)
└── isMaxTokensCapEnabled 行 3394 (25 行)
2. 行 1-50:Imports —— 30+ 模块¶
import type {
BetaContentBlock, BetaContentBlockParam, BetaImageBlockParam,
BetaJSONOutputFormat, BetaMessage, BetaMessageDeltaUsage,
BetaMessageStreamParams, BetaOutputConfig, BetaRawMessageStreamEvent,
BetaRequestDocumentBlock, BetaStopReason, BetaToolChoiceAuto,
BetaToolChoiceTool, BetaToolResultBlockParam, BetaToolUnion,
BetaUsage, BetaMessageParam as MessageParam, TextBlockParam,
} from '@anthropic-ai/sdk/resources/...'
import { randomUUID } from 'crypto'
import { getAPIProvider, isFirstPartyAnthropicBaseUrl } from 'src/utils/model/providers.js'
import { getAttributionHeader, getCLISyspromptPrefix } from '../../constants/system.js'
// ... 25+ import
关键洞察:
- 全部从 @anthropic-ai/sdk import 30+ 个类型 —— 深度使用 SDK 的所有 Beta 能力(extended thinking、prompt caching、tool use、message batches、structured output、images、documents)
- as MessageParam 别名 —— 解决 SDK 内部类型命名
- getAPIProvider / isFirstPartyAnthropicBaseUrl —— 4 种 provider 适配(Anthropic / Bedrock / Vertex / claude.ai)
预测外部构建:@anthropic-ai/sdk 会被 bun bundle 内联(参考 bun:bundle)。
3. A 段:参数配置(行 272-530)¶
3.1 getExtraBodyParams(行 272-332,61 行)¶
export function getExtraBodyParams(betaHeaders?: string[]): JsonObject {
// 1. 解析 ANTHROPIC_CUSTOM_HEADERS
// 2. 解析 ANTHROPIC_EXTRA_BODY
// 3. 构造 structured output 字段
// 4. 构造 effort 字段
// 5. 构造 token budget 字段
// ... 合并返回
}
作用:从 env vars 提取自定义请求体参数,加到每个 API 请求。
为什么是 env vars: - 用户/企业可定制 - 不需要改代码
3.2 getPromptCachingEnabled(行 333-357,25 行)¶
export function getPromptCachingEnabled(model: string): boolean {
// 1. 默认对所有模型启用
// 2. 除非 model 在 DISABLE_PROMPT_CACHING_HAIKU/SONNET/OPUS 名单
// 3. 除非全局 DISABLE_PROMPT_CACHING
return true // 或 false
}
prompt cache 是 Anthropic API 的省钱特性: - 重复的 system prompt / 长上下文不重复计费 - Claude Code 默认启用
env vars 控制粒度:全局 + 按模型(haiku/sonnet/opus)。
3.3 getCacheControl(行 358-392,35 行)¶
export function getCacheControl({
type = 'ephemeral',
ttl,
}: {
type?: 'ephemeral' | 'persistent'
ttl?: '5m' | '1h'
}): { type: string; ttl?: string } {
// 返回 cache_control 字段
}
Anthropic API 的缓存控制:
- ephemeral —— 5 分钟过期(默认)
- persistent —— 1 小时过期
- 1h TTL 是 Beta 特性(should1hCacheTTL() 检查)
3.4 should1hCacheTTL(行 393-439,47 行)¶
function should1hCacheTTL(querySource?: QuerySource): boolean {
// 检查 query 来源是否支持 1h cache
// 1. 默认 false
// 2. enterprise / pro 用户开启
// 3. querySource 是 'batch' 之类长任务 → true
}
1 小时 cache 是付费特性,默认关闭。
3.5 configureEffortParams(行 440-478,39 行)¶
function configureEffortParams(body: JsonObject, effort: EffortValue): void {
// 设置 body.thinking = { type: 'enabled', budget_tokens: ... }
// 或 body.output_config = { effort: 'low' | 'medium' | 'high' }
}
模型思考强度控制:
- 'low' —— 快、便宜
- 'high' —— 慢、准
- Anthropic API 的 Beta 特性
3.6 configureTaskBudgetParams(行 479-502,24 行)¶
export function configureTaskBudgetParams(body, taskBudget) {
// 设置 token 预算
// body.budget_tokens = taskBudget.total
}
总 token 预算 —— 给整个任务设上限,不超支。
3.7 getAPIMetadata(行 503-529,27 行)¶
export function getAPIMetadata(): {
attribution: AttributionState
cost: number
duration: number
// ...
} {
// 从 cost-tracker / bootstrap.state 提取
// 注入到每个 API 请求的 metadata 字段
}
API metadata —— 客户端信息,Anthropic 用来分析哪些 client 在用什么模型、什么费用。
4. B 段:API 验证(行 530-588)¶
4.1 verifyApiKey(行 530-587,58 行)¶
export async function verifyApiKey(apiKey: string): Promise<{
valid: boolean
error?: string
}> {
// 1. 构造最小请求
// 2. 调 /v1/messages
// 3. 根据响应判断
// 4. 返回 valid / error
}
首次启动验证: - 调一次"hello" API - 检查 key 是否有效 - 失败时给用户友好提示("API key 无效,请检查 ~/.claude/settings.json")
配套:hooks/useApiKeyVerification.ts 在 REPL 启动时调。
5. C 段:消息格式转换(行 588-709)¶
5.1 userMessageToMessageParam(行 588-632,45 行)¶
export function userMessageToMessageParam(
msg: UserMessage,
options: { /* ... */ }
): MessageParam {
// 1. 转换 content(string → ContentBlock[])
// 2. 处理 attachments(图片 / PDF)
// 3. 处理 tool_result blocks
// 4. 处理 cache_control
// 5. 返回 SDK 格式
}
关键洞察:
- 内部类型 (UserMessage) → SDK 类型 (MessageParam)
- 附件、tool_result、cache 都在这里处理
- 配套:assistantMessageToMessageParam 是反向
5.2 assistantMessageToMessageParam(行 633-708,76 行)¶
export function assistantMessageToMessageParam(
msg: AssistantMessage,
options: { /* ... */ }
): MessageParam {
// 1. 转换 thinking blocks
// 2. 转换 text blocks
// 3. 转换 tool_use blocks
// 4. 处理 cache_control
}
76 行 > 45 行 —— 因为 assistant message 有 thinking、tool_use 两种 block。
6. D 段:流式/非流式入口(行 709-1017)¶
6.1 queryModelWithoutStreaming(行 709-751,43 行)¶
export async function queryModelWithoutStreaming(
messages: Message[],
systemPrompt: SystemPrompt,
context: ToolUseContext,
canUseTool: CanUseToolFn,
options: { /* ... */ }
): Promise<Message[]> {
// 1. 构造请求
// 2. 调 executeNonStreamingRequest
// 3. 返回完整 messages(含 assistant 回复)
}
非流式 —— 一次返回所有消息。
用途:SDK 模式、批量任务、resume。
6.2 queryModelWithStreaming(行 752-785,34 行)¶
export async function* queryModelWithStreaming(
messages: Message[],
// ...
): AsyncGenerator<StreamEvent, void, void> {
// 1. 构造流式请求
// 2. for await 消费 stream
// 3. 透传每个 event
}
流式 —— 异步生成器,逐 token 返回。
用途:REPL 实时显示。
6.3 shouldDeferLspTool(行 786-806,21 行)¶
function shouldDeferLspTool(tool: Tool): boolean {
// LSP 工具(auto-complete)应该在 LLM 调用后再执行
// 而不是和 LLM 调用并行
}
LSP(Language Server Protocol)工具的特殊处理。
6.4 getNonstreamingFallbackTimeoutMs(行 807-817,11 行)¶
function getNonstreamingFallbackTimeoutMs(): number {
// 流式调用超时 → 降级到非流式
// 超时时长
return API_TIMEOUT_MS // 默认 60s
}
降级机制 —— 流式失败时自动切非流式。
6.5 executeNonStreamingRequest(行 818-927,110 行)¶
export async function* executeNonStreamingRequest(
params: MessageStreamParams,
): AsyncGenerator<StreamEvent, void, void> {
// 1. client.messages.create({ ...params, stream: false })
// 2. 模拟流式输出(single event)
// 3. yield 单个完整消息
}
非流式 + 模拟流式 —— 内部统一用 AsyncGenerator<StreamEvent>,业务层不知道是流式还是非流式。
这是"统一接口"设计:
- 业务层只关心 for await
- 底层可选流式 / 非流式
- 降级零成本
7. E 段:⭐ queryModel 主函数(行 1017-2898,1881 行)¶
这是整个 claude.ts 最核心的函数。1881 行不是单一逻辑,是完整 LLM 调用的所有边界处理。
7.1 推测的整体结构¶
async function* queryModel(
messages: Message[],
systemPrompt: SystemPrompt,
context: ToolUseContext,
canUseTool: CanUseToolFn,
options: QueryOptions,
): AsyncGenerator<StreamEvent, void, void> {
// ─── 1. 准备阶段(约 200 行)───
// 1.1 构造请求参数
// 1.2 转换 messages → MessageParam[]
// 1.3 选择 provider(Anthropic / Bedrock / Vertex)
// 1.4 构造 client
// 1.5 准备 cache breakpoints
// 1.6 处理 system prompt blocks
// 1.7 处理 tools
// 1.8 处理 metadata / attribution
// 1.9 处理 thinking config
// 1.10 处理 effort params
// 1.11 处理 task budget
// ─── 2. API 调用(约 200 行)───
// 2.1 决定流式 / 非流式
// 2.2 调 client.messages.stream / .create
// 2.3 错误处理 + 重试
// ─── 3. 流解析(约 800 行)───
// 3.1 解析 message_start
// 3.2 解析 content_block_start(text / thinking / tool_use / image / document)
// 3.3 解析 content_block_delta(逐 token)
// 3.4 解析 content_block_stop
// 3.5 解析 message_delta(usage、stop_reason)
// 3.6 解析 message_stop
// ─── 4. 后处理(约 400 行)───
// 4.1 累加 usage
// 4.2 处理 cache breakpoints
// 4.3 处理 tool_use
// 4.4 处理 plan mode
// 4.5 处理 synthetic output
// 4.6 错误恢复
// ─── 5. 工具循环(约 200 行)───
// 5.1 如果 assistant 返回 tool_use → 调 canUseTool
// 5.2 权限检查
// 5.3 调工具
// 5.4 把 tool_result 加到 messages
// 5.5 递归 / 继续循环
// ─── 6. 收尾(约 80 行)───
// 6.1 cleanupStream
// 6.2 updateUsage
// 6.3 accumulateUsage
// 6.4 返回最终结果
}
7.2 关键设计点¶
7.2.1 1881 行的"巨型函数"是合理的吗?¶
乍看不合理,但 Claude Code 选这种结构有理由:
1. 真正的"流式循环"逻辑就是长
for await (const event of stream) {
switch (event.type) {
case 'message_start': /* 100 行状态处理 */ break
case 'content_block_start': /* 200 行不同 block 类型 */ break
case 'content_block_delta': /* 300 行 delta 累积 */ break
case 'content_block_stop': /* 150 行 block 收尾 */ break
case 'message_delta': /* 100 行 usage 处理 */ break
case 'message_stop': /* 80 行清理 */ break
}
}
switch 内部每个 case 都是 100-300 行,累加 1881 行。
2. 拆函数会损失 inline 优化的机会
V8 引擎对大函数有特殊优化(如 TurboFan 的 inlining)。 拆成 50 个小函数反而慢。
3. 业务逻辑高度耦合
流式事件的处理和 工具调用、cache、usage 处理深度交织。 强拆 = 大量参数传递。
4. 测试用 mock 而非拆分
业务测试用 vitest mock(vitest.mock('./Anthropic')),不需要拆函数。
7.2.2 queryModel 内部的关键 switch 块¶
switch (event.type) 处理 6 种 event,每种 100-300 行。
最复杂的:content_block_start —— 根据 block.type 分 5+ 种处理(text、thinking、tool_use、image、document、redacted_thinking)。
case 'content_block_start':
switch (event.content_block.type) {
case 'text':
// 启动 text block 累积器
break
case 'thinking':
// 启动 thinking block 累积器
break
case 'tool_use':
// 启动 tool_use block 解析
break
case 'image':
// 处理图片
break
case 'document':
// 处理 PDF
break
// ...
}
break
每个 case 内部:
- 启动累积器(currentBlock = { ... })
- 处理 cache control
- 处理 attribution
- 状态更新
7.2.3 工具循环(Tool Loop)¶
// 推测的代码
let iterCount = 0
while (true) {
iterCount++
if (iterCount > MAX_ITERATIONS) {
throw new MaxIterationsError()
}
// 1. 跑 LLM
for await (const event of queryModel(...)) {
yield event
if (event.type === 'tool_use') {
// 2. 调工具
const result = await canUseTool(event.tool, event.input, context)
// 3. 追加 tool_result 到 messages
messages = [...messages, result]
}
}
}
关键:LLM 和 tool 在同一个 async generator 里循环。 - 消费者看到的是流式 events - 业务上是 LLM → tool → LLM → tool 的循环
7.2.4 错误处理(在 queryModel 内部)¶
try {
for await (const event of stream) {
yield event
}
} catch (err) {
// 1. 分类错误
const category = categorizeRetryableAPIError(err)
if (category?.retryable) {
// 2. 退避后重试
yield* retryWithBackoff(messages, context, category)
} else if (category?.code === 'rate_limit') {
// 3. 降级到 fallback model
yield* fallbackToSecondaryModel(messages, context)
} else {
// 4. 抛给上层
throw err
}
}
"try / catch + 分类 + 重试 / 降级" 是 Claude Code 的标准模式。
8. F 段:后处理(行 2898-3213)¶
8.1 cleanupStream(行 2898-2923,26 行)¶
export function cleanupStream(stream: Stream): void {
// 1. abort stream
// 2. 释放 file handles
// 3. 清理 listeners
}
资源清理 —— AbortController 触发的流必须清理。
8.2 updateUsage(行 2924-2992,69 行)¶
export function updateUsage(
state: AppState,
usage: BetaUsage,
cost: number,
duration: number,
): AppState {
// 1. 累加 input_tokens / output_tokens
// 2. 累加 cost
// 3. 累加 duration
// 4. 返回新 state
}
Usage 累计 —— 每次 API 响应都更新 state。
8.3 accumulateUsage(行 2993-3039,47 行)¶
export function accumulateUsage(
a: NonNullableUsage,
b: NonNullableUsage,
): NonNullableUsage {
// 字段加法
return {
input_tokens: a.input_tokens + b.input_tokens,
output_tokens: a.output_tokens + b.output_tokens,
cache_creation_input_tokens: ...,
cache_read_input_tokens: ...,
cost_usd: a.cost_usd + b.cost_usd,
}
}
纯函数 —— 把两个 usage 累加。配合 updateUsage 用。
8.4 addCacheBreakpoints(行 3063-3212,150 行)¶
export function addCacheBreakpoints(
messages: MessageParam[],
options: { /* ... */ }
): MessageParam[] {
// 1. 找到 system prompt
// 2. 在 system prompt 末尾加 cache_control
// 3. 找到 messages 中合适的"分界点"
// 4. 在分界点加 cache_control
// 5. 返回处理后的 messages
}
150 行的 cache 策略 —— prompt cache 不是简单加标记,有算法: - 太靠前 → 缓存命中率低 - 太靠后 → 缓存太大 - 最优分界点 = 滚动的"过去 N 条消息"边界
9. G 段:System prompt(行 3213-3241)¶
9.1 buildSystemPromptBlocks(行 3213-3240,28 行)¶
export function buildSystemPromptBlocks(
systemPrompt: string | ContentBlock[],
tools: Tools,
context: ToolUseContext,
): ContentBlock[] {
// 1. 加 CLI prefix
// 2. 加 attribution
// 3. 加 cache control
// 4. 加自定义 system prompt
// 返回 ContentBlock[]
}
构造 system prompt —— 包含工具描述、attribution、cache 标记。
10. H 段:顶层 query 函数(行 3241-3364)¶
10.1 queryHaiku(行 3241-3299,59 行)¶
export async function queryHaiku(prompt: string): Promise<string> {
// 1. 调 small model(haiku)做轻量任务
// 2. 用于 classifier、摘要等
// 3. 不用 tools、很小 token 预算
}
Haiku 专门函数 —— 用于轻量任务(不是主 LLM 循环)。 典型用途: - 压缩时的摘要 - 风险分类 - 提取关键信息
10.2 queryWithModel(行 3300-3363,64 行)¶
export async function queryWithModel({
model,
messages,
systemPrompt,
maxTokens,
}): Promise<string> {
// 1. 用指定 model(opus / sonnet / haiku)调一次
// 2. 非流式
// 3. 返回纯文本
}
非流式、纯文本、单 model —— SDK 模式最常用。
11. I 段:辅助(行 3364-3419)¶
11.1 adjustParamsForNonStreaming(行 3364-3393,30 行)¶
function adjustParamsForNonStreaming(params): MessageStreamParams {
// 调整 max_tokens / thinking 预算(流式 vs 非流式不同)
}
11.2 isMaxTokensCapEnabled(行 3394-3418,25 行)¶
function isMaxTokensCapEnabled(): boolean {
// 检查 CLAUDE_CODE_MAX_OUTPUT_TOKENS env var
return process.env.CLAUDE_CODE_MAX_OUTPUT_TOKENS !== undefined
}
Token 上限 —— 用户可设。
12. 关键洞察¶
12.1 "claude.ts 是 3419 行的工具包装"的意义¶
Anthropic SDK 已经提供了: - 流式 / 非流式 API - 类型定义 - 错误类
Claude Code claude.ts 加了: - 4 种 provider 适配 - Prompt cache 策略 - Usage 累计 - 错误分类 + 重试 - System prompt 构造 - Tool loop 集成 - 30+ env vars / feature flag 控制
包装价值巨大 —— 业务层只需要 for await (const e of queryModel(...)) 即可。
12.2 1881 行 queryModel 的设计哲学¶
为什么不拆: - V8 inline 优化 - 业务逻辑强耦合 - 测试用 mock
为什么不重构: - 注释解释了每个分段("准备"、"API 调用"、"流解析"、"后处理"、"工具循环") - 未来维护者不需要考古
12.3 "流式 / 非流式"双实现 + 统一接口¶
async function* executeNonStreamingRequest(params): AsyncGenerator<StreamEvent> {
// 模拟流式
const response = await client.messages.create({ ...params, stream: false })
yield { type: 'message_start', ... }
yield { type: 'content_block_start', ... }
yield { type: 'content_block_delta', text: response.text }
yield { type: 'message_stop', ... }
}
业务层不区分: - REPL 用真流式(实时显示) - SDK 用非流式 + 模拟(一次返回) - 切换零成本
12.4 Anthropic SDK 的 30+ 类型都用到了¶
BetaMessageDeltaUsage—— 流式 usageBetaRequestDocumentBlock—— PDF 文档BetaToolChoiceAuto/BetaToolChoiceTool—— 工具选择BetaJSONOutputFormat—— 结构化输出BetaStopReason—— 停止原因- 等等
"吃透 SDK"是 Claude Code 的核心能力。
13. 阅读清单¶
- ✅ 完整通读
src/services/api/claude.ts(3419 行) - ✅ 读 phase-06-agent-loop.md 配合
- 📌 读
@anthropic-ai/sdk的 messages.mjs 类型 - 📌 读
src/services/api/errors.ts(错误分类) - 📌 读
src/services/api/withRetry.ts(重试)
14. 练习任务¶
- 数
queryModel内的 switch case 数量 —— 6 种 event × 各自 5-10 sub-case - 找出所有 "early return" 路径 —— queryModel 在什么情况下提前 return?
- 设计你自己的 "API client wrapper" —— 给一个 200 行的 TypeScript 函数包装 Anthropic SDK,支持流式 / 非流式 / 错误重试
- 思考:1881 行的
queryModel是不是"坏味道"?如果是,怎么拆?如果不是,为什么?