第 17 章 在线评测平台:langsmith、langfuse、phoenix 架构对比

“Logs without context are noise; traces without evals are sightseeing.” —— 一条流传于 Anthropic Discord 的工程格言

本章要点

  • 在线评测平台的工程职责:trace 收集、采样、评分、可视化、告警
  • 三家代表平台的定位差异:langsmith(LangChain 生态)/ langfuse(开源自托管)/ phoenix(OTel 原生)
  • Trace 数据模型:spans、attributes、events 的标准结构
  • 在线 grader 的两种部署模式:sidecar vs 异步 worker
  • 平台选型决策树:6 维度评估 + 切换成本

17.1 为什么需要专门的在线评测平台

第 2 章 §2.3 已经讨论过在线评测的核心难题——生产流量没有 ground truth,必须用 LLM-as-Judge 实时打分 + 用户隐式反馈 + 抽样人工标注三管齐下。但要把这一套跑成生产系统,还需要回答一系列工程问题:

flowchart TB
  Q[在线评测的工程难题] --> Q1[Trace 怎么收集<br/>侵入业务代码到什么程度]
  Q --> Q2[采样策略<br/>1% 还是 100%]
  Q --> Q3[判分异步 / 同步]
  Q --> Q4[大流量下的存储<br/>每天 GB-TB 级数据]
  Q --> Q5[隐私 / PII 处理]
  Q --> Q6[Dashboard 与告警]
  Q --> Q7[人工标注闭环]

自建一套覆盖以上 7 件事的系统约需 1-2 工程团队 × 半年。这就是为什么从 2023 年起 langsmith / langfuse / phoenix 等专业平台兴起——把这些工程负担集中到平台层。

17.2 三家平台的定位差异

三家平台的位置可以用两个轴粗略定位:

平台协议生态绑定
langsmith商业 SaaS与 LangChain / LangGraph 深度集成
langfuseMIT 开源框架中立,自托管为主
phoenixApache 2 开源OpenTelemetry 原生,与可观测栈无缝

三家的简短画像:

  • langsmith(LangChain 团队商业产品):与 LangChain / LangGraph 深度耦合,trace 自动捕获,eval / dataset 一体化
  • langfuse(开源自托管首选):MIT 协议、Postgres 后端、可自部署,集成中立(不绑定 LangChain)
  • phoenix(Arize AI 开源):完全 OpenTelemetry 原生,最大优势是与现有 OTel 基础设施无缝接入

工业团队不应该只看”哪个最强”,而是看”哪个最匹配现有技术栈”。下面分别看三家的工程细节。

17.3 langsmith:trace + eval + dataset 三位一体

17.3.1 数据模型

langsmith 的核心数据单元是 Run(一次完整 LLM 调用,可嵌套):

from langsmith import Client

client = Client()

# 自动 trace LangChain 链调用
@traceable(name="customer-support-rag")
def my_rag(query: str):
    contexts = retriever.invoke(query)
    answer = llm.invoke(prompt.format(query=query, context=contexts))
    return answer

# 手工 trace
with client.run_helpers.trace("custom-step", inputs={"q": q}) as run:
    result = ...
    run.end(outputs={"r": result})

每个 Run 自动捕获:

  • 输入 / 输出(完整 JSON)
  • 子 Run 嵌套(形成 trace tree)
  • 模型 / 参数 / token 用量 / 延迟
  • 自定义 metadata(user_id / session_id 等)
  • 自动关联 dataset 与 evaluation

17.3.2 评测一体化

langsmith 把”在线 trace”和”离线 dataset 评测”拉到同一界面:

from langsmith.evaluation import evaluate

results = evaluate(
    target=my_rag,                 # 被测函数
    data="customer-support-v1",    # langsmith 上的 dataset
    evaluators=[
        faithfulness_evaluator,    # 自定义 grader
        cost_evaluator,
    ],
    experiment_prefix="prompt-v3",
)

跑完产出 dashboard:每条样例的 trace + 多个 evaluator 的得分 + 与 baseline 对比。这种”trace、dataset、eval 一体化”是 langsmith 最被认可的工程价值。

17.3.3 Online Evaluator

langsmith 支持把 evaluator 设为”在线模式”——所有生产 trace 自动跑评分:

from langsmith.evaluation import OnlineEvaluator

OnlineEvaluator(
    name="prod-faithfulness",
    sampling_rate=0.1,        # 10% 采样
    evaluator=faithfulness_evaluator,
).deploy()

部署后所有生产 trace 都被自动评分、聚合曲线打到 dashboard、超阈值触发 webhook 告警。这就是第 2 章 §2.3 在线评测在工程上的标准形态。

17.4 langfuse:开源自托管首选

17.4.1 架构特点

langfuse 是 MIT 协议开源、可以自托管的方案。整体架构:

flowchart LR
  App[业务应用] -->|SDK| API[langfuse API Server<br/>Node.js]
  API --> PG[(Postgres)]
  API --> CH[(ClickHouse<br/>大流量优化)]
  PG --> UI[langfuse Web UI]
  CH --> UI
  API --> Q[评测队列]
  Q --> W[Worker<br/>跑 LLM-judge]
  W --> CH
  style API fill:#dbeafe
  style UI fill:#dcfce7

工程亮点:

  • API Server / Worker 解耦:业务应用只往 API 推 trace,judge / scoring 在异步 worker 里跑。生产流量与评测开销解耦
  • Postgres + ClickHouse 双库:Postgres 存元数据(用户、配置、dataset)、ClickHouse 存 trace 大流水(按列存储优化分析查询)
  • 完全开源 SDK:JavaScript / Python / Go / Java 都有官方 SDK,集成中立

17.4.2 自托管的工程价值

对中国团队 / 数据合规要求高的团队,自托管几乎是唯一选项——生产 prompt 含 PII 不能上美国 SaaS。langfuse 自托管 docker-compose 一键起,运维门槛低。

工业实测:单机 langfuse(4C16G + 200GB SSD)能支撑 100 万 trace / 天。再大流量需要 ClickHouse 集群 + 多 Worker,成本仍比 langsmith 商业版便宜一个量级。

17.5 phoenix:OpenTelemetry 原生

17.5.1 OTel 优先的设计哲学

phoenix(Arize AI 开源)选择完全押注 OpenTelemetry:

from openinference.instrumentation.openai import OpenAIInstrumentor
from phoenix.otel import register

# phoenix 接收 OTel trace
tracer_provider = register(project_name="my-rag-app")
OpenAIInstrumentor().instrument(tracer_provider=tracer_provider)

# 之后所有 OpenAI SDK 调用自动 trace 到 phoenix

OTel-first 的工程意义:

  • 零侵入:业务代码不需要改,OTel auto-instrumentation 接管所有
  • 统一 stack:与 Datadog / Honeycomb / Jaeger 等 OTel 后端可以并行接入
  • 跨语言:OTel 是行业标准,所有主流语言都有支持

17.5.2 trace 的标准 OTel 模型

phoenix 严格遵循 OTel 的 spans + attributes + events 模型:

trace
├── span "rag-pipeline"
│   ├── span "retriever.invoke"
│   │   ├── attribute: input.value
│   │   ├── attribute: output.contexts
│   │   └── event: retrieval.completed
│   └── span "llm.generate"
│       ├── attribute: llm.model_name
│       ├── attribute: llm.token_count.prompt
│       ├── attribute: llm.token_count.completion
│       └── attribute: output.value

这个标准化让 phoenix 的 trace 能被任何 OTel 工具消费,反过来也能消费任何 OTel 工具产生的 trace。这是 langsmith / langfuse 都不具备的工程优势。

17.6 三家平台的工程对比

维度langsmithlangfusephoenix
协议商业 SaaSMIT 开源Apache 2 开源
自托管部分支持(企业版)一键 docker-composedocker / pip install
默认捕获LangChain 自动手动 SDK 或 OTelOTel auto-instrument
评测一体化一流中等一般
开放性LangChain 生态中立OTel 中立
大流量后端自有云ClickHousePostgres + 内置 OLAP
中国数据合规否(美国 SaaS)自托管解决自托管解决
价格$39+/seat/月免费 self-host免费 self-host

17.7 在线 grader 的两种部署模式

flowchart TB
  subgraph Sidecar 模式
    A1[业务应用] -->|每次调用| G1[Grader sidecar]
    G1 -->|实时打分| S1[scoring DB]
  end
  subgraph 异步队列模式
    A2[业务应用] -->|push trace| Q[Trace Queue]
    Q --> W[Grader Worker<br/>异步消费]
    W --> S2[scoring DB]
  end
  style G1 fill:#fee2e2
  style W fill:#dcfce7

两种模式各有取舍:

  • Sidecar:评测延迟低(同步可见),但每次业务调用都要等 grader → 生产链路被评测拖慢
  • 异步队列:业务零阻塞,但评测结果有几秒到几分钟延迟、不能用于实时决策

工业实践绝大部分用异步队列——除非你的业务真的需要”评测分进入实时回答的决策”(如 best-of-K 选最优)。phoenix / langfuse / langsmith 默认都是异步模式。

17.8 选型决策树

flowchart TD
  Start[在线评测平台选型] --> A{已用 LangChain<br/>/LangGraph?}
  A -->|是| B{有数据出境合规?}
  A -->|否| C{已有 OTel 基础设施?}
  B -->|否| L[langsmith]
  B -->|是| LF1[langfuse 自托管]
  C -->|是| P[phoenix]
  C -->|否| D{需要 evaluator<br/>一体化?}
  D -->|是| LF2[langfuse]
  D -->|否| P2[phoenix]
  style L fill:#dbeafe
  style LF1 fill:#dcfce7
  style LF2 fill:#dcfce7
  style P fill:#fef3c7
  style P2 fill:#fef3c7

5 个分支的具体选型逻辑:

  • LangChain 重度 + 无合规约束 → langsmith(一体化最强)
  • LangChain 重度 + 数据不出境 → langfuse 自托管(兼容性优先)
  • 已有 OTel → phoenix(与现有可观测栈无缝)
  • 一般场景需要 evaluator 一体化 → langfuse
  • 一般场景只要 trace → phoenix

切换成本:langsmith ↔ langfuse 大约 1 周(API 形态相近)、langfuse ↔ phoenix 大约 1-2 周(要改 OTel 适配)。所以最初选型很重要。

17.9 隐私与合规:三个工程要点

无论选哪家平台,PII / 合规处理都是工程团队自己的责任:

17.9.1 PII 自动遮罩

在 SDK 层做 PII 检测 + 遮罩:

import re

def redact_pii(text: str) -> str:
    text = re.sub(r"\b\d{11}\b", "[PHONE]", text)         # 手机号
    text = re.sub(r"\b\d{18}\b", "[ID]", text)             # 身份证
    text = re.sub(r"\b[\w.-]+@[\w.-]+\.\w+\b", "[EMAIL]", text)
    return text

# 在 trace 推送前调用
client.create_trace(
    inputs={"query": redact_pii(user_query)},
    outputs={"answer": redact_pii(model_answer)},
)

这一步必须在数据离开应用进程之前完成——一旦上传到云平台再脱敏,已经为时已晚。

17.9.2 数据保留期

GDPR / CCPA / 中国个人信息保护法都有”数据最小保留”原则。三家平台都支持配置 trace 自动过期(默认 90 天,可调)。生产环境必须显式配置,不能用默认。

17.9.3 用户撤回 / 数据删除

GDPR 的”被遗忘权”要求按 user_id 批量删除 trace。三家平台 API 都支持,但需要你的应用层确保 trace 都带 user_id 标签——这是设计 instrumentation 时就要考虑的事。

17.10 一份完整的在线评测落地方案

整合本章方法成一份新项目的工程清单:

[ ] Week 1: 选型 + POC
    [ ] 按 §17.8 决策树选 1 个平台
    [ ] 起一个 dev 环境跑通 trace 收集
    [ ] 在 100 条样例上验证 evaluator 工作

[ ] Week 2: 集成主链路
    [ ] SDK 集成到生产代码
    [ ] PII 遮罩 (§17.9.1)
    [ ] 设 1% 采样率 → 观察存储成本

[ ] Week 3: Evaluator
    [ ] 部署 Faithfulness / Hallucination Rate 在线 grader
    [ ] 配 dashboard
    [ ] 设阈值 + Slack 告警

[ ] Week 4: Hard Case Mining
    [ ] 接 thumbs-up/down 反馈
    [ ] 每周 review 低分 trace, 反哺离线集
    [ ] 与第 3 章 §3.6 闭环

[ ] Month 2: 元评测 + 优化
    [ ] 每月跑 §8.6.6 元评测仪式
    [ ] 调整采样率、grader 选型

这份清单从零到生产 30 天能完成。Week 1-2 是最关键的——选错平台或集成不彻底,后面所有工作都被卡住。

17.10.5 OpenInference:跨平台 trace 标准的雏形

OpenTelemetry 定义了”通用 trace 协议”,但 LLM 应用的 trace 需要更多专属语义——prompt、completion、token usage、tool call 这些字段。OTel 本身不规定如何用 attribute 表达它们。

OpenInference 是 Arize 团队推动的一份”LLM 专属 OTel 语义约定”。它定义了一组标准 attribute:

llm.input_messages.0.role         = "user"
llm.input_messages.0.content      = "..."
llm.output_messages.0.role        = "assistant"
llm.output_messages.0.content     = "..."
llm.model_name                    = "gpt-4o"
llm.token_count.prompt            = 234
llm.token_count.completion        = 156
llm.invocation_parameters         = '{"temperature": 0.7, "max_tokens": 500}'
retrieval.documents.0.document.content = "..."
retrieval.documents.0.document.score   = 0.91
tool.json_schema                  = "{...}"
tool.name                         = "search_database"

这套约定的工程价值:在 phoenix / langfuse / langsmith / Datadog 之间无缝迁移 trace。同一份 OTel 数据,可以同时被多个平台消费。

2025 年起 LangChain、LlamaIndex、AutoGen、Anthropic SDK 都内置了 OpenInference 支持。这意味着工程团队不必绑定单一平台——埋点用 OpenInference 标准、平台层可替换。

工业团队即使现在用 langsmith,也建议把内部 trace 抽象层做成 OpenInference 兼容——为未来切换 / 多平台并行留口子。

17.10.6 Trace 存储成本优化的工程套路

第 2 章 §2.3.4 给过采样策略的数学。这里补充存储层的具体优化,因为生产规模下 trace 存储成本可能比 LLM API 调用费还高:

flowchart TB
  Raw[每天原始 trace<br/>~100GB-1TB] --> S{优化层}
  S --> S1[1. 字段裁剪<br/>大 prompt content 不全存]
  S --> S2[2. 分级保留<br/>失败 trace 90 天, 成功 7 天]
  S --> S3[3. 列存压缩<br/>ClickHouse / Parquet]
  S --> S4[4. 冷热分层<br/>近 7 天 SSD, 老的 S3]
  S --> S5[5. 采样存储<br/>关键路径 100%, 长尾 1%]
  S1 --> Save[降本 50-80%]
  S2 --> Save
  S3 --> Save
  S4 --> Save
  S5 --> Save
  style Save fill:#dcfce7

每条具体做法:

  1. 字段裁剪:保留 metadata、user_id、scores,把超长 prompt content 用 hash + S3 引用代替(除非 trace 被标记为 hard case)
  2. 分级保留:失败 / 低 score 的 trace 保留 90 天供根因分析;正常 trace 7 天后归档
  3. 列存压缩:ClickHouse / Parquet 按列存储 + zstd 压缩,文本类字段压缩率常达 5-10x
  4. 冷热分层:近期(7 天内)SSD 高速访问;30+ 天的归档到 S3 / OSS
  5. 关键路径全采样、长尾低采样:高价值场景(账户、支付)100%、长尾 1%

这五条组合,能把 trace 存储成本压到原来 20-30%——足够让评测体系长期可持续。

17.10.7 三家平台的 evaluator 部署模式对比

一个常被忽视的差异——三家平台怎么把 grader 部署到生产。这决定了开发者迭代评测的便利度:

平台部署模式改动 grader 后多久生效离线 / 在线 grader 共用度
langsmithUI 配置 + python decorator立即(push 即生效)高(同一函数)
langfuseUI 配置 / API 配置立即中(需要适配两套 SDK)
phoenixOTel evaluator重启 service低(OTel 与 Python evaluator 是不同抽象)

langsmith 在这点上有明显优势——它的 evaluator 就是普通 Python 函数 @evaluator 装饰一下:

@evaluator
def faithfulness(run, example):
    answer = run.outputs["answer"]
    context = run.inputs["context"]
    score = call_judge(answer, context)
    return {"score": score, "key": "faithfulness"}

同一个函数既可以在离线 evaluate(...) 里用,也可以注册成 OnlineEvaluator 给生产流量打分——开发者不需要写两套。

langfuse 也支持 evaluator 但形态稍重,需要写完整的 trace listener。phoenix 因为 OTel 原生设计,evaluator 跟 OTel processor 绑定,迭代节奏明显慢于前两家。

工业团队选型时,把”evaluator 迭代节奏”列入决策树:

  • 团队对 prompt / grader 高频迭代(每周改动)→ langsmith
  • 节奏中等 + 自托管要求 → langfuse
  • evaluator 相对稳定 + 强 OTel 栈 → phoenix

17.10.8 一个常被遗忘的细节:trace 的 schema 演化

trace 数据有一个看似无关紧要、实际影响深远的工程问题:schema 演化

随着业务发展,你的 trace 字段会扩展——新增 user_id、新增 session_id、新增 LLM provider 标签等。这些字段一旦写入数据库,再想 query 历史数据就会遇到:

查询 6 个月前的数据 → 当时的 trace 没有 provider 字段 → query 失败 / 默认值不对

三家平台的应对:

  • langsmith:metadata 字段自由,每个 Run 自带任意 dict,无 schema 约束。但代价是无法跨时间统一 query
  • langfuse:用 ClickHouse 的 Map 类型存自定义字段,加字段不需要 migrate。但 query 性能比定字段稍差
  • phoenix:依赖 OTel attributes,OTel 标准本身允许任意字段 — 但跨平台兼容时需要遵循 OpenInference 命名约定(详见 §17.10.5)

工业实践:

  1. 把核心字段(user_id、session_id、provider、model_name、cost、latency)从第一天就规范化
  2. 业务相关字段放 business_metadata,约定可演化
  3. 每季度审查一次 trace schema,标记 deprecated 字段 6 个月后清理

这种”schema 治理”是评测平台运维的隐藏工作量——很多团队上线时不重视,半年后被”想做新分析却查不到老数据”卡住。

17.10.9 三家平台的 dataset / experiment / human feedback 集成模式

平台不止做 trace + grader,还要把 dataset / experiment / human feedback 三个工件统一管理。这是把”在线评测”和”离线评测”打通的最后一公里。

flowchart LR
  Trace[在线 trace] -->|标注| HF[Human Feedback]
  HF -->|入数据集| DS[Dataset]
  DS -->|跑实验| Exp[Experiment]
  Exp -->|对比| Trace
  style HF fill:#fef3c7
  style DS fill:#dcfce7
  style Exp fill:#dbeafe

三家的支持差异:

  • langsmith:dataset / experiment / feedback 三件事一体化,UI 流畅,跨工件关联做得最好
  • langfuse:三件事都支持但 UI 相对独立,跨工件跳转需要点几次
  • phoenix:dataset 与 experiment 较弱,主要靠 OTel data + Python notebook 做实验

工业团队的判断:如果你的工作流是 “线上 trace → 选低分 trace → 标注 → 入集 → 离线实验 → 上线” 这个循环,langsmith 的体验明显胜出。其他两家适合工作流相对独立的场景。

17.10.10 自建 vs 用平台:一个被低估的技术债

很多团队的初期会想”先自己搭一套简单 trace 系统,等以后再迁移到 langsmith / langfuse”。这种判断在大多数情况下是错的——自建的”简单 trace”会迅速演化成不可维护的技术债

自建的演化轨迹通常是:

Week 1: 一个 print + 写 jsonl 文件
Week 4: 加 SQLite, 简单 SQL 查询
Week 12: SQLite 扛不住了, 上 Postgres
Month 6: 想做 trace 嵌套 → 重构数据模型
Month 9: 想加 grader → 写一套 evaluator 抽象
Month 12: 想加 dashboard → 接 Grafana, 维护成本巨大
Year 2: 想加自动告警 → ... 重写一半组件
Year 3: 团队成员变动, 没人懂这套自建系统了

每一步都是合理的演化,但回头看,团队相当于花 2 年时间重新发明了 langsmith / langfuse。同样这 2 年时间,如果直接用平台 + 把团队精力放在业务评测上,价值产出至少高 10 倍。

例外情况——值得自建的场景:

  • 极端规模(每天 10 亿 trace):langsmith / langfuse 都不够
  • 极端合规:政务 / 军工等场景,开源平台都不能用
  • 公司有专职可观测性团队:自建有人维护,能持续投入

如果不在以上三种情况,直接用平台是几乎所有团队的最优选择。这个判断在 2026 年比 2023 年更明确——三家平台都已成熟,自建的相对优势在缩小。

17.10.11 一个工程实战:trace 数据如何驱动 prompt 优化

把 trace 数据用起来不只是”看 dashboard”。下面是一个具体的 prompt 优化工作流:

flowchart TB
  T[1k+ 生产 trace] --> S{按 score 分桶}
  S -->|低分| L[低分 trace 池]
  S -->|高分| H[高分 trace 池]
  L --> A1[抽取共性失败模式<br/>用 LLM 聚类]
  H --> A2[抽取共性成功模式]
  A1 --> Hyp[假设: prompt 在哪些场景失败]
  A2 --> Hyp
  Hyp --> NewP[改写 prompt v2]
  NewP --> Test[在原 trace 池上回放<br/>看新 prompt 表现]
  Test --> Compare[对比 v1 / v2 指标]
  Compare -->|v2 更好| Deploy[上线]
  Compare -->|不如 v1| Iter[再迭代]
  style Hyp fill:#fef3c7
  style Deploy fill:#dcfce7

关键步骤:

  1. 按 score 分桶:langsmith / langfuse 都支持按 score / feedback 筛选 trace
  2. LLM 聚类:用 GPT-4 把低分 trace 的失败原因聚成 5-10 类(自动归纳”为什么挂了”)
  3. 假设驱动:基于聚类结果提出 prompt 改进假设
  4. 回放验证:在原 trace 池上用新 prompt 重跑(不需要新拉用户流量)
  5. 指标对比:v1 vs v2 的 paired comparison

这种”trace 驱动 prompt 优化”的工作流,比”工程师拍脑袋改 prompt”高效 5-10 倍。它把”prompt engineering” 从”猜”升级成”基于数据的工程优化”——这正是评测体系给团队带来的能力升级。

17.10.12 一份选型 cheatsheet:3 分钟做决定

读完三家平台的细节后,给一份 3 分钟做决定的 cheatsheet:

你的情况选谁主要理由
已经用 LangChain / LangGraph + 海外 SaaS 可接受langsmith一体化最优、零搭建
中国数据合规 + 团队懂 dockerlangfuse自托管开源、中文社区活跃
已有 OTel 基础设施(Datadog / Honeycomb / Grafana)phoenixOTel 原生、与现有栈无缝
个人开发者 / 极小团队langfuse 自托管 + 单机零成本起步
大企业 / 强合规 / 数据敏感langfuse 私有部署完全自主 + 合规
学术研究 / 论文实验phoenix + Python notebook灵活、与 ML 工具链接合
多 LLM provider 组合(OpenAI/Claude/本地)phoenix(OTel 中立)不绑生态

无论选哪家,最重要的是从第一天就把 trace 接上——晚做几个月会损失大量 hard case 数据。这种”先有再说”的迭代节奏比”选完美工具”更重要。

17.10.13 一个 2026 年的新趋势:评测 / 监控 / 实验融合

LLM 工程领域正在出现一个新趋势——评测、监控、实验三件事正在融合到同一个工具

传统拆分:

  • 评测:promptfoo / ragas — 离线判分
  • 监控:Datadog / Prometheus — 生产 metric
  • 实验:Optimizely / 自建 A/B 框架 — 用户分组

LLM 时代这三件事的边界模糊:

  • 在线评测就是监控(指标曲线)
  • A/B 测试与离线评测共享判分逻辑
  • 用户反馈是评测信号也是 A/B 决策依据

LangSmith / Langfuse / Phoenix 都在向”统一三件事”演化。LangSmith 的 experiment + dashboard + feedback 已经一体化;Langfuse 在迭代中加 A/B testing 能力;Phoenix 与 Arize 商业版做实验管理。

工业团队的取舍:长期看应该选向”统一融合”方向投入的平台,避免后期被迫维护三套独立工具。这是 2026-2027 年评测平台选型时一个重要的”未来兼容性”考量。

17.10.14 一个工业现实:自建评测平台的失败案例(化名讨论)

读完三家平台的对比后,仍有团队倾向于自建。给一个公开报道里的反例(综合 LinkedIn 多家公司技术博客提取,化名讨论):

某中等规模 AI 创业公司(约 30 人)在 2023 年决定自建评测平台。他们的预期:

  • 节省 SaaS 费(langsmith 当时月度 ¥3 万)
  • 完全控制数据流向
  • 内部需求快速迭代

实际发生(来自他们后续技术博客复盘):

  • 第 1 季度:搭基础 trace 系统,2 人专职——成本约 ¥30 万
  • 第 2 季度:扩展评测器,发现自己 LLM-judge 调用不稳定——再投 1 人
  • 第 3 季度:要做 PR Quality Gate 集成,发现自建系统不支持—花 3 人月重构
  • 第 4 季度:核心团队成员离职,新人不懂这套自建系统——技术债爆发

到第 5 季度团队最终决策:弃用自建,迁移到 langsmith。整个过程花了约 ¥150 万 + 6 人季工时,且生产里有 1 个月评测体系空窗期(迁移过渡)。

教训:评测平台不是核心业务。除非有极强的合规 / 规模理由,自建几乎一定亏损。这个反例不是个别现象——LangSmith / Langfuse 的销售周期里这类”自建失败 → 迁移过来”的客户占了相当比例。

工业团队的判断标准:你们团队的核心竞争力是什么?如果不是”评测平台”——那就买 / 用开源 SaaS,把工程时间花在自家业务上。

17.10.15 一个常被低估的能力:Replay

LangSmith / Langfuse 等平台都支持 trace replay——把过去某条 trace 的输入”重放”过当前最新的 LLM / prompt / chain,看输出是否变化。

工程价值:

  1. 回归测试:模型版本升级时,把昨天 1000 条 trace 重放一遍,对比指标变化
  2. prompt 改动验证:改了 prompt 后用历史 trace 验证有没有破坏过去能解决的问题
  3. A/B 验证:不需要等真实用户,用历史 trace 模拟新 prompt 的效果
from langsmith import Client

client = Client()
old_traces = client.list_runs(
    project_name="my-app",
    filter="created_at >= '2026-04-01'",
    limit=1000,
)

new_results = []
for trace in old_traces:
    new_output = my_new_chain.invoke(trace.inputs)
    new_results.append({
        "input": trace.inputs,
        "old_output": trace.outputs,
        "new_output": new_output,
    })

# 用同一个 grader 评估新旧输出
old_scores = [grader(r["input"], r["old_output"]) for r in new_results]
new_scores = [grader(r["input"], r["new_output"]) for r in new_results]

# 对比
print(f"Old avg: {mean(old_scores):.3f}")
print(f"New avg: {mean(new_scores):.3f}")
print(f"Improved cases: {sum(n > o for o, n in zip(old_scores, new_scores))}")

这种”用历史数据评测未来变更”的能力,让 prompt / model 升级几乎不需要等真实用户验证——开发阶段就能拿到定量证据。这是评测体系给迭代速度的核心红利之一。

工业团队的实践:每次 prompt 改动 PR 都跑一遍”过去 7 天 trace 的 replay”——20 分钟跑完、得到指标变化报告、附在 PR 评论里。这种”replay 即评测”的范式,是 2025 年起平台主导的工程升级。

17.10.16 一个工程问题:Trace 的”原数据” vs “脱敏后数据”

trace 系统有一个看似细节但影响深远的工程决策——保留原数据还是脱敏后数据

两套方案:

方案 A:原数据 + 访问层脱敏

trace 入库时: 完整原文(含 PII)
访问 API 时: 按调用者权限动态脱敏

方案 B:入库前脱敏

trace 入库时: 已经脱敏(PII 替换为 token)
访问 API 时: 直接返回(无 PII)

权衡:

维度方案 A方案 B
原始还原能力强(必要时管理员能看完整)弱(已经丢失原文)
隐私风险高(数据泄露 = PII 泄露)低(泄露的也是脱敏后)
合规配合弱(GDPR 数据删除请求难处理)强(已无 PII)
调试便利中(脱敏可能影响排查)

国内多数团队选方案 B 因为合规风险更低;海外团队部分选 A 因为能保留更多调试能力。

工程实务:关键合规场景必须方案 B(医疗 / 金融 / 政务);非高合规场景方案 A + 强访问控制可接受。这个决策一旦做了很难反悔——半年后想从 A 切到 B,要重新处理已存的几 TB trace 数据,工程量极大。

17.10.17 一个团队的 trace 系统总成本估算

汇总 17 章方法,给一份典型团队 trace 系统的年度总成本(中等规模团队、月调用量 500 万 trace):

项目年成本
平台 SaaS 费 / 自托管资源¥50-200 万
存储成本(90 天热 + 1 年冷)¥10-30 万
Online judge LLM 调用费¥30-60 万
Engineer 维护时间(1 FTE × 30%)¥30-50 万
合规审计 / PII 处理工具¥5-20 万
总计¥125-360 万/年

这个数字看起来吓人,但比较”评测体系缺失导致的事故损失”(千万级)便宜得多。中等规模团队应把 trace 系统视为”必要基础设施”而非”可选锦上添花”。

成本优化方向:

  • 采样率从 100% 降到 1-5%:是最大的杠杆(成本降 95%+)
  • 平台从商业 SaaS 切到开源自托管:能省 30-50%
  • judge 模型从 Claude Sonnet 降到 Haiku 或本地 Llama:能省 50-80%

这些优化能把 trace 系统的总成本压到 ¥50-100 万 / 年——对中等规模团队是可承受范围。

17.10.18 在线评测平台的”开源 vs 商业”选型经济学

工业团队选 langsmith(商业)还是 langfuse / phoenix(开源)的决策不只看技术,更看经济学。一份具体测算:

商业平台 langsmith 年度成本(10 人团队 / 月调用 100 万 trace):

seat: 10 人 × $39/月 = $4,680/年
trace 量超额: $1-3K/年
高级功能(dataset / eval API): 包含
合计: $6-8K / 年 ≈ ¥4.5-6 万/年

开源 langfuse 自托管年度成本(同等规模):

服务器(1 台 4C16G + 200GB): ¥1.5-2 万/年
工程师维护时间: 0.2 FTE × ¥30 万 = ¥6 万/年
LLM-judge API 调用费: ¥3-6 万/年
合计: ¥10-14 万/年

商业版反而更便宜!原因:开源的”免费”是有代价的——服务器 + 工程师维护时间。这个成本在小团队规模下比商业 SaaS 还高。

转折点:当团队规模到 50+ 人或月 trace 量到 1000 万+,商业版会变贵(按 seat / trace 量计费),开源自托管的”固定成本”开始占优。

工程团队的判断:

  • < 30 人 / 月 trace < 500 万 → 商业版
  • 30-100 人 / 月 trace 500 万 - 5000 万 → 看合规要求
  • 100+ 人 / 月 trace > 5000 万 → 开源自托管

中国数据合规场景例外——任何规模都建议开源自托管,因为”数据出境”风险比节省的钱重要得多。

17.10.19 一个不容忽视的工程问题:trace 平台的”垂直锁定”

任何 SaaS 平台都有”vendor lock-in”风险——锁定后切换成本高。trace 平台特别明显:

  • LangSmith trace 数据用 LangSmith 专有格式
  • Langfuse 用 OpenTelemetry + 自家 schema
  • Phoenix 用纯 OTel 标准

锁定程度:LangSmith > Langfuse > Phoenix(OTel 中立度反序)

工程修法(避免锁定):

  1. 抽象层封装:业务代码不直接调 LangSmith API,过一层自家封装
  2. OpenInference 标准:trace 元数据用 OpenInference 命名约定
  3. 数据导出能力:定期把 trace 数据导出到自家数据仓库(备份 + 防锁定)
  4. 二次开发不写死:自定义 evaluator 用通用接口,不绑定平台特性

这些工程动作让团队在未来切换平台时,迁移成本从”半年重写”降到”1-2 周适配”。看似繁琐的抽象层,是评测体系长期可维护的工程保险。

17.10.20 一个不容易做对的工程话题:trace 数据的”语义切片”

trace 数据的价值不在于”全部存下来”,而在于”能按业务相关维度切片”。常被忽视的”语义切片”维度:

  • 按 user cohort:付费用户 vs 免费用户的评测分数差异
  • 按 session 长度:短 session vs 长 session 的失败模式差异
  • 按 referrer:从 SEO / 直接访问 / API 调用三类来源的不同质量
  • 按时段:高峰 / 低峰时段的延迟与质量差异
  • 按 model version:不同模型版本的实际表现对比

每个切片都对应一种”业务洞察”。比如发现”长 session 的 Faithfulness 更低” → 提示模型在长上下文上有问题;“高峰时段 Latency 飙升” → 提示需要扩容。

工程实务:trace 入库时把这些切片维度作为 metadata 存储。Dashboard 支持按任一维度切片观察。这种”多维分析能力”是 trace 系统的核心价值——比”trace 数据存下来”重要 10 倍。

17.10.21 trace 平台与 RUM (Real User Monitoring) 的协同

成熟团队会把 LLM trace 平台与传统 RUM(Real User Monitoring)系统对接。RUM(如 Datadog RUM、Sentry)跟踪前端用户行为,trace 跟踪 LLM 调用——两者协同能形成完整的用户体验视图:

RUM 数据: 用户停留时长 / 点击 / 滚动 / 跳出
+ Trace 数据: LLM 调用 / 评测分数 / 失败 case
= 完整用户画像

具体集成:

  • 用户的 RUM session_id 关联到该用户产生的所有 LLM trace
  • 跳出率高的 session → 关联的 LLM trace 是否质量低
  • 用户拇指向下 → 对应的 LLM trace 标记为 hard case

这种 “RUM + LLM trace” 的协同让评测体系从”LLM 单点”扩展到”用户体验全景”。也是 LLM 应用从”功能完整”走向”用户体验优化”的关键工程基础。

工业实务:RUM 数据 + LLM trace 的关联在 LangSmith / Langfuse 等平台还需要自己接(默认不集成)。需要在 SDK 集成时显式传递 user / session id 关联。这是评测平台的下一个工程演进方向——从”LLM-only” 升级到”产品体验全栈”。

17.10.22 在线评测平台的”未来 5 年趋势”

读完本章后,给读者一份在线评测平台的未来趋势预判(基于 2026 年初的观察):

趋势 1:标准化

  • OpenInference 等 trace 标准会逐步普及
  • 不同平台的 trace 数据互通性增强
  • vendor lock-in 风险降低

趋势 2:智能化

  • 平台自动从 trace 中发现失败模式
  • AI 辅助的 evaluator 推荐
  • 主动告警从”指标超阈值”升级到”行为异常”

趋势 3:合规化

  • 内置 PII 检测 / 数据保留 / 用户撤回
  • 适配 EU AI Act / NIST RMF 等监管要求
  • 审计 trail 成为默认能力

趋势 4:与 product analytics 融合

  • LLM trace 与产品行为数据打通(参见 §17.10.21 的 RUM 协同)
  • 完整产品体验视角,不只是 LLM 调用

趋势 5:开源加速

  • Langfuse / Phoenix 等开源工具持续追赶商业版
  • 自托管成本下降,社区贡献增加

工程团队的应对:

  • 选平台时不只看现在功能,看是否在以上 5 个趋势上投入
  • OpenInference 兼容是最低标准
  • 评估开源 vs 商业时考虑 5 年周期

这种”5 年趋势”判断让平台选型不只是”今天最好的”,更是”5 年仍然不被淘汰的”。

17.10.23 一份给 trace 平台运维工程师的”6 大能力”清单

最后给 trace 平台 owner 的能力清单:

  1. 数据建模:能设计 trace schema 适配业务需求
  2. 采样工程:能根据流量 / 成本动态调采样率
  3. 存储优化:会用 ClickHouse / Parquet 等列存技术
  4. 隐私合规:理解 PII / GDPR / 个保法
  5. 可观测性:能用 Grafana / Datadog 设计 dashboard
  6. 跨团队协作:能与算法 / 应用 / DevOps / 合规对接

6 项能力齐全的工程师是”trace 平台 owner”的合格人选。任一缺位都会让 trace 平台运维出问题。

工业实务:招聘或培养这种岗位时,6 项能力是面试 / 评估的标准。读完本章读者已经具备 #1-#3 的核心知识,剩下 #4-#6 需要专项学习——但本书已经给了充足的入门门槛。

17.10.24 一个工程现实:trace 平台的”成熟周期”

读完本章后,给在线评测平台的”成熟周期”判断:

平台年龄状态投入风险
0-1 年early adopter高(API 频繁 break)
1-2 年早期生态中(核心功能稳定)
2-3 年主流采用低(feature 完善)
3-5 年成熟极低(行业事实标准)
5+ 年维护期风险升高(可能停滞或被替代)

按这个时间表:

  • LangSmith(2023 公开):刚进入”主流采用”
  • Langfuse(2023 开源):进入”主流采用”
  • Phoenix(2023 开源):进入”主流采用”

工业团队的判断:当下 3 家都处于”低风险”阶段,可放心投入。但每年关注一次成熟度变化——5 年后可能有新的”事实标准”出现,要预留切换的工程灵活性(参见 §17.10.19 vendor lock-in 防御)。

17.10.25 在线评测平台的”未来 trace 体量”预估

随着 LLM 应用规模增长,trace 体量也在指数增长。未来 5 年的预估:

  • 2026:典型企业 LLM 应用月 trace 量 10-100 万
  • 2027:100 万-1000 万
  • 2028:1000 万-1 亿
  • 2029:1 亿+
  • 2030:典型企业 trace 量与今天的 SaaS 应用 metric 量相当

这种指数增长会让现有平台架构面临挑战:

  • 存储:1 亿/月 trace × 平均 5KB = 500GB/月 = 6TB/年
  • 判分:100% 评测在大规模下不可行,必须采样 + 分层
  • 查询:跨百万 trace 的分析查询要 ClickHouse / Iceberg 等数据栈

工程团队的应对:选平台时考虑”5 年后体量”,不只看”今天的体量”。LangSmith / Langfuse / Phoenix 都在向”百万级 / 亿级”演化,但具体进度有差异——这是平台选型的”长期赌注”。

17.10.26 一份”在线评测平台是否成功”的判断信号

读完本章后,给一份”团队的在线评测平台是否真的在工作”的诊断信号:

□ 平台每天有人主动看(不只是"配了告警")
□ 月度有 5+ 个 trace 因为告警被深入分析
□ 季度有指标因为漂移而调整
□ 在线评测的发现能反哺到离线集(每月 ≥ 5 条)
□ 工程师 review PR 时会看 trace 链接
□ PM 在做产品决策时会引用在线评测数据
□ 合规审计能看到完整的 trace 历史

7 项全过 = 平台在生产中真正发挥价值。3-5 项过 = 平台在用但没充分用。0-2 项过 = 平台只是摆设、需要重新审视投入。

工业实务:把这份信号清单作为月度自查项。如果发现平台沦为摆设 → 立即审视 owner / 流程 / 文化。技术上做到不难,“真的有人用”才是难点。

17.10.27 在线评测平台的”多区域”考量

国际化 LLM 应用上线时还有一个工程考量——多区域 trace 平台部署。具体挑战:

  • 数据主权:欧盟用户的 trace 必须留在欧盟(GDPR)
  • 延迟:trace 推送跨大洲会增加 100-500ms 延迟
  • 法律责任:不同司法管辖区的法律责任不同
  • 监管合规:EU AI Act / NIST RMF / 中国合规各有差异

工程修法:

  • 每区域独立 trace 平台:欧盟用 EU 实例、美国用 US 实例、亚太用 APAC 实例
  • 数据不跨境:每区域的 trace 数据本地存储 + 处理
  • 合规配置:每区域单独配置数据保留期 / 用户撤回流程
  • 统一 dashboard:跨区域指标可聚合,但原始数据不汇聚

工业实务:跨国 LLM 应用必须从第一天就考虑多区域架构。等用户量增长后再补多区域成本是 5-10 倍。这是国际化 LLM 应用工程的”长期规划”项。

17.10.28 在线评测平台的”知识共享”价值

最后讨论一个非工程价值——在线评测平台是团队知识共享的基础设施

具体场景:

  • 工程师 A 发现一类失败模式,share trace URL 给团队
  • PM 看 trace 理解”用户实际怎么用产品”
  • 客服基于 trace 训练”识别用户问题的能力”
  • 新人 onboarding 看历史 trace 学习”什么是好回答”

这种”知识共享”超出技术范畴——是团队对产品的共同认知建立。在线评测平台不只是”质量监控工具”,更是”团队认知的载体”。

读完本章希望读者带走的最后一个认知:在线评测平台的价值不只在”评测”,更在”让团队对产品有共同认知”。这种认知统一是高效团队的关键基础。

17.10.29 在线评测平台的”投资 vs 自建”决策框架

读完整章方法学后,给一份”投资 SaaS 平台 vs 自建”的决策框架:

flowchart TD
  Start[需要在线评测] --> A{规模 / 合规要求?}
  A -->|低 + 海外可接受| SaaS[商业 SaaS<br/>langsmith]
  A -->|低 + 国内合规| OS[开源自托管<br/>langfuse / phoenix]
  A -->|高 + 极敏感| Build[完全自建]
  Build --> Cost[投入: 1-2 人 × 6 月]
  OS --> Cost2[投入: 0.3 人 × 持续]
  SaaS --> Cost3[投入: $5-50K/年]
  style SaaS fill:#dcfce7
  style OS fill:#fef3c7
  style Build fill:#fee2e2

90% 团队应该选 SaaS 或开源自托管。完全自建只在极特殊场景值得(参见 §17.10.15 自建技术债演化)。

读完本章希望读者带走的最强决策:评测平台不是核心业务,不要自建——除非你有极强的特殊理由。这是评测体系建设的最经济选择。

17.10.30 在线评测平台的”读完承诺”

读完整章 trace 平台方法学后,给读者一份”读完承诺”:

  • 我会从第一天就接入 trace 系统(不等”以后”)
  • 我会做 PII 自动脱敏(不让数据泄露)
  • 我会按业务规模选合适的平台(不盲目自建)
  • 我会让 trace 数据真正驱动决策(不只是”配了就忘”)
  • 我会季度审视平台是否还在工作

5 项承诺对应整章核心方法学。任何 LLM 应用开发者都该走这条路径。

读完本章希望读者带走的最朴素认知:trace 系统是 LLM 应用的”心电图”——没有它你不知道产品的真实状态。从第一天接入 trace、长期投入、持续利用——这是 LLM 应用工程化的基础设施级要求。

17.10.31 三家平台的”具体定价对比”(2026 年初)

最后给一份基于公开定价的具体对比,让读者做选型决策时有数字依据:

LangSmith(商业 SaaS):

  • Free tier:5k traces/月、单用户
  • Plus:$39/seat/月、50k traces/月
  • Enterprise:定制定价(典型起步 $10k+/年)
  • 超额 trace:$0.5 / 1k traces

Langfuse(开源 + 商业):

  • 开源自托管:免费(自付服务器成本)
  • Langfuse Cloud Free:5k 单元/月
  • Cloud Hobby:$59/月、100k 单元
  • Cloud Pro:$199/月、500k 单元
  • Cloud Enterprise:定制定价

Phoenix(Apache 2.0 完全开源):

  • 自托管:免费(自付服务器成本)
  • Arize 商业版:定制定价

注:定价可能变化,以官网最新为准。

按月度 trace 量算典型工业团队的总成本:

月 trace 量LangSmithLangfuse CloudLangfuse 自托管Phoenix 自托管
50k$39+/月$59/月¥1k 服务器¥1k 服务器
500k$50-100/月$199/月¥3-5k 服务器¥3-5k 服务器
5M$200-500/月$500-1000/月¥1-3 万服务器¥1-3 万服务器
50M$1000+/月$2000+/月¥3-10 万服务器¥3-10 万服务器

工程团队的判断:

  • 50M+ 规模:自托管显著划算
  • < 5M 规模:Cloud 更省工程时间
  • 中国合规:自托管唯一选项
  • 海外创业团队:商业 Cloud 起步最快

读完本章希望读者带走的最具体决策依据:按你团队当前 trace 量算实际成本。30 分钟做一次真实计算,比”凭直觉觉得贵 / 便宜”靠谱得多。

17.10.32 一份 trace 数据导出与归档的完整脚本

整合本章方法学,给一份”trace 数据每月归档到 S3”的完整脚本:

# trace_archiver.py
import boto3
import gzip
import json
from datetime import datetime, timedelta
from pathlib import Path
from langfuse import Langfuse

class TraceArchiver:
    def __init__(self, langfuse_client, s3_bucket: str, retention_days: int = 30):
        self.lf = langfuse_client
        self.s3 = boto3.client("s3")
        self.bucket = s3_bucket
        self.retention = retention_days

    def archive_month(self, year: int, month: int):
        """归档某月的所有 trace 到 S3 并 PII 脱敏"""
        start = datetime(year, month, 1)
        end = (start + timedelta(days=32)).replace(day=1)

        traces = self.lf.fetch_traces(
            from_timestamp=start, to_timestamp=end, limit=1_000_000,
        )

        # PII 脱敏
        scrubbed = [self._scrub_pii(t.dict()) for t in traces]

        # gzip 压缩
        archive_path = Path(f"/tmp/traces_{year}_{month:02d}.jsonl.gz")
        with gzip.open(archive_path, "wt", encoding="utf-8") as f:
            for trace in scrubbed:
                f.write(json.dumps(trace, ensure_ascii=False) + "\n")

        # 上传 S3
        s3_key = f"traces/{year}/{month:02d}/traces.jsonl.gz"
        self.s3.upload_file(
            str(archive_path), self.bucket, s3_key,
            ExtraArgs={"StorageClass": "GLACIER_IR"},
        )

        # 删除归档过的 hot trace(节省热存储)
        for trace in traces:
            if (datetime.now() - trace.timestamp).days > self.retention:
                self.lf.delete_trace(trace.id)

        return {
            "archived": len(scrubbed),
            "s3_path": f"s3://{self.bucket}/{s3_key}",
            "size_mb": archive_path.stat().st_size / 1024 / 1024,
        }

    def _scrub_pii(self, trace: dict) -> dict:
        """PII 脱敏"""
        import re
        text = json.dumps(trace, ensure_ascii=False)
        text = re.sub(r"\b\d{11}\b", "[PHONE]", text)
        text = re.sub(r"\b\d{18}\b", "[ID]", text)
        text = re.sub(r"\b[\w.-]+@[\w.-]+\.\w+\b", "[EMAIL]", text)
        return json.loads(text)

约 60 行代码完成 trace 归档的完整生命周期(月度归档 / PII 脱敏 / gzip 压缩 / S3 GLACIER 冷存储 / 删除热 trace)。1 年 trace 数据归档到 GLACIER 的成本约是 hot SSD 的 1/20。

17.10.33 三家平台的”接入代码”对照

整合本章方法学,给一份”同一个 RAG 应用接入三家平台的代码差异”对照:

LangSmith:

from langsmith import traceable

@traceable(name="rag-pipeline")
def my_rag(query: str):
    contexts = retriever.invoke(query)
    answer = llm.invoke(f"Q: {query}\nContext: {contexts}")
    return answer
# 自动 trace 到 LangSmith dashboard

Langfuse:

from langfuse.decorators import observe

@observe(name="rag-pipeline")
def my_rag(query: str):
    contexts = retriever.invoke(query)
    answer = llm.invoke(f"Q: {query}\nContext: {contexts}")
    return answer
# 自动 trace 到 Langfuse

Phoenix(OTel 原生):

from openinference.instrumentation.openai import OpenAIInstrumentor
from phoenix.otel import register

tracer_provider = register(project_name="my-rag-app")
OpenAIInstrumentor().instrument(tracer_provider=tracer_provider)

# 普通函数, 无装饰器
def my_rag(query: str):
    contexts = retriever.invoke(query)
    answer = llm.invoke(f"Q: {query}\nContext: {contexts}")
    return answer
# OTel auto-instrument 自动 trace

接入差异对照表:

维度LangSmithLangfusePhoenix
API 风格@traceable@observeOTel auto-instrument
侵入度中(要装饰器)中(要装饰器)低(零代码)
配置复杂度低(只要 API key)低(只要 API key)中(要 register OTel)
跨语言支持Python / JSPython / JS / Java / GoOTel 全语言
学习曲线略陡(OTel 概念)

工程团队的实务:

  • 已有 OTel 栈 → Phoenix 零侵入接入
  • 全 Python / JS 团队 → LangSmith / Langfuse 装饰器最简
  • 多语言项目 → Langfuse(最广支持)或 Phoenix(OTel)

读完本章希望读者带走的最具体行动:今天就拿 5 行代码接入一家 trace 平台。从”读懂”到”用上”——只差 5 行装饰器。

17.10.34 一份 OpenInference Semantic Conventions 字段速查表

OpenInference 是 Phoenix 与 OTel 共同推动的 LLM trace 标准(github.com/Arize-ai/openinference),它在 OTel 的 SpanAttributes 之上定义了 LLM 专属字段。以下是工程实务最常用的 21 个属性(覆盖 95% 场景):

字段名类型必填工程含义
openinference.span.kindstringLLM / RETRIEVER / EMBEDDING / TOOL / AGENT / CHAIN / RERANKER / GUARDRAIL
llm.model_namestringgpt-4o / claude-opus-4 / qwen-max
llm.providerstringopenai / anthropic / azure
llm.systemstring-提供商生态:openai / anthropic / vertexai
llm.input_messagesjson[{role, content}] 多轮消息
llm.output_messagesjson模型回应消息
llm.token_count.promptintprompt tokens
llm.token_count.completionintcompletion tokens
llm.token_count.totalinttotal tokens
llm.invocation_parametersjson-{temperature, top_p, max_tokens}
llm.toolsjson-tool calling schema
llm.function_calljson-模型选择调用的 tool
input.value / output.valuestring通用 input/output(非 LLM span 用)
retrieval.documentsjson-RAG 检索结果 [{id, content, score}]
retrieval.querystring-检索 query 原文
embedding.embeddingsjson-向量化结果
tool.name / tool.parametersstring/json-tool 调用的函数名 + 入参
tool.json_schemajson-tool 的 JSON Schema
session.id / user.idstring-会话与用户标识
metadatajson-自定义业务字段
tag.tagsarray-用于分类的多标签

工程实务:当一个团队同时使用 LangSmith 与 Phoenix 时,OpenInference 是唯一的”中间语”——只要 SDK 写入这套属性,两端都能解析。这是 §17.10.19 “垂直锁定”的逃生方案。

from openinference.semconv.trace import SpanAttributes
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span("llm_call") as span:
    span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, "LLM")
    span.set_attribute(SpanAttributes.LLM_MODEL_NAME, "gpt-4o")
    span.set_attribute(SpanAttributes.LLM_PROVIDER, "openai")
    span.set_attribute(SpanAttributes.LLM_TOKEN_COUNT_PROMPT, 124)
    span.set_attribute(SpanAttributes.LLM_TOKEN_COUNT_COMPLETION, 89)
    span.set_attribute(SpanAttributes.INPUT_VALUE, prompt)
    span.set_attribute(SpanAttributes.OUTPUT_VALUE, response)

10 行代码完成”标准化的 LLM trace”——不绑定任何平台,能被 LangSmith / Langfuse / Phoenix / Datadog / 自建 OTel 栈通用消费。这是 LLM 可观测的”长期资产”——把所有 trace 都按这套规范写入,3 年后换平台的迁移成本接近零。

17.10.35 一份 Trace 采样的”分层抽样”算法

§17.5 提到了在线 trace 采样的概念,但未深入算法。生产环境 1 亿日 trace 量级时,纯均匀采样会让”罕见但重要”的 case(如 user feedback 差评、tool call 失败)淹没在海量正常流量里。下面是一份分层采样算法实现,是 LangSmith / Datadog / Honeycomb 等平台共同采用的核心思路:

import random
import hashlib
from dataclasses import dataclass
from typing import Callable
from collections import defaultdict

@dataclass
class TraceMetadata:
    trace_id: str
    user_feedback: str | None
    error: bool
    tool_call_failed: bool
    latency_ms: int
    is_high_value_user: bool

@dataclass
class SamplingDecision:
    sampled: bool
    reason: str
    stratum: str

class StratifiedTraceSampler:
    """分层采样:保证罕见高价值 trace 满采,常态流量降采"""

    STRATA_RATES = {
        "user_negative_feedback": 1.0,
        "error": 1.0,
        "tool_call_failed": 1.0,
        "high_value_user": 0.5,
        "long_latency_p99": 0.3,
        "normal": 0.01,
    }

    def __init__(self, base_rate: float = 0.01,
                 latency_p99_ms: int = 5000):
        self.base_rate = base_rate
        self.latency_p99_ms = latency_p99_ms
        self.stratum_counts = defaultdict(int)

    def _stratify(self, m: TraceMetadata) -> str:
        if m.user_feedback == "negative":
            return "user_negative_feedback"
        if m.error:
            return "error"
        if m.tool_call_failed:
            return "tool_call_failed"
        if m.latency_ms > self.latency_p99_ms:
            return "long_latency_p99"
        if m.is_high_value_user:
            return "high_value_user"
        return "normal"

    def _deterministic_dice(self, trace_id: str) -> float:
        """用 trace_id 的 hash 决定采样命中——同 trace 多次决策一致"""
        h = hashlib.md5(trace_id.encode()).hexdigest()[:16]
        return int(h, 16) / float(2 ** 64)

    def decide(self, m: TraceMetadata) -> SamplingDecision:
        stratum = self._stratify(m)
        rate = self.STRATA_RATES[stratum]
        dice = self._deterministic_dice(m.trace_id)
        sampled = dice < rate
        if sampled:
            self.stratum_counts[stratum] += 1
        return SamplingDecision(
            sampled=sampled,
            reason=f"stratum={stratum} rate={rate} dice={dice:.4f}",
            stratum=stratum,
        )

    def report(self) -> dict:
        total = sum(self.stratum_counts.values())
        return {
            "total_sampled": total,
            "by_stratum": dict(self.stratum_counts),
            "stratum_pct": {k: round(v / max(total, 1) * 100, 2)
                            for k, v in self.stratum_counts.items()},
        }
flowchart TB
  T[trace 入栈] --> Q1{user_feedback = negative?}
  Q1 -->|是| S1[100% 采]
  Q1 -->|否| Q2{error?}
  Q2 -->|是| S1
  Q2 -->|否| Q3{tool_call 失败?}
  Q3 -->|是| S1
  Q3 -->|否| Q4{p99 长尾?}
  Q4 -->|是| S2[30% 采]
  Q4 -->|否| Q5{高价值用户?}
  Q5 -->|是| S3[50% 采]
  Q5 -->|否| S4[1% 采]

  style S1 fill:#ffebee
  style S4 fill:#e8f5e9

工程实务的 4 个关键设计:

  • 重要 stratum 100% 采:差评 / 异常 / tool 失败一条不漏
  • 延迟长尾 30%:保证慢请求得到充足分析样本
  • 高价值用户 50%:付费 / VIP 用户的体验问题不能被均值掩盖
  • deterministic dice:用 trace_id 的 hash 决定采样命中——保证”同一 trace 在重试时仍命中或仍不命中”,不会丢失上下文

数学上的好处:1 亿日 trace 量级 + 1% 基准采样率 + 6 个 strata,总采样数约 100-150 万条/天。其中:

  • 错误 / 差评类(约 1-2 万)100% 采样 → 完整捕获
  • 高价值用户 trace(约 3 万)50% 采样
  • 长尾延迟(约 5 万)30% 采样
  • 普通流量 1% 采样 → 平均 100 万条

这种分层让”工程师每天打开 dashboard 看到的样本里 80% 都有诊断价值”——比纯随机采样的 dashboard 信息密度高一个数量级。

17.10.36 一份”在线 trace → 黄金集”自动 mining 流水线

§3.6 提出 Hard Case Mining,§17.10 给了 trace 平台。本节把两者衔接起来——一份能直接跑的 mining 流水线:从 trace 平台抓出”明显 trace 失败”的样本,自动转成黄金集草稿,提 PR 给评测工程师 review。

import asyncio
import json
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Iterable, Callable, Awaitable
from pathlib import Path

@dataclass
class HardCaseCandidate:
    source_trace_id: str
    user_query: str
    bot_answer: str
    failure_signal: str
    severity: float
    suggested_label: str
    suggested_expected: str

class TraceToGoldenSetMiner:
    """从在线 trace 中自动挖掘 hard case,输出黄金集 PR draft"""

    SIGNAL_WEIGHTS = {
        "user_thumbs_down": 1.0,
        "judge_score_low": 0.8,
        "tool_call_error": 0.7,
        "long_latency_p99": 0.4,
        "user_followup_repeated": 0.6,
    }

    def __init__(self, trace_fetcher: Callable[[datetime, datetime], Awaitable[list[dict]]],
                 dedup_set: set[str], judge_fn: Callable):
        self.fetch = trace_fetcher
        self.dedup = dedup_set
        self.judge = judge_fn

    def _score_severity(self, trace: dict) -> tuple[float, str]:
        signals = []
        score = 0.0
        if trace.get("user_feedback") == "negative":
            signals.append("user_thumbs_down")
            score += self.SIGNAL_WEIGHTS["user_thumbs_down"]
        if trace.get("judge_score", 1.0) < 0.6:
            signals.append("judge_score_low")
            score += self.SIGNAL_WEIGHTS["judge_score_low"]
        if any(t.get("error") for t in trace.get("tool_calls", [])):
            signals.append("tool_call_error")
            score += self.SIGNAL_WEIGHTS["tool_call_error"]
        if trace.get("latency_ms", 0) > 5000:
            signals.append("long_latency_p99")
            score += self.SIGNAL_WEIGHTS["long_latency_p99"]
        if trace.get("user_followup_similar", False):
            signals.append("user_followup_repeated")
            score += self.SIGNAL_WEIGHTS["user_followup_repeated"]
        return min(score, 1.0), "+".join(signals) or "none"

    def _question_hash(self, q: str) -> str:
        import hashlib
        return hashlib.md5(q.lower().strip().encode()).hexdigest()[:12]

    async def _suggest_expected(self, query: str, bot_answer: str) -> str:
        """用 reasoning judge 拟一个 expected answer 草稿"""
        prompt = (f"Query: {query}\nBot answer (failed): {bot_answer}\n"
                  "Write a 2-sentence ideal answer that the bot SHOULD have given.")
        return await self.judge(prompt)

    async def mine(self, days_back: int = 7,
                   min_severity: float = 0.6) -> list[HardCaseCandidate]:
        end = datetime.now()
        start = end - timedelta(days=days_back)
        traces = await self.fetch(start, end)
        candidates = []
        for trace in traces:
            sev, signal = self._score_severity(trace)
            if sev < min_severity:
                continue
            qhash = self._question_hash(trace["user_query"])
            if qhash in self.dedup:
                continue
            self.dedup.add(qhash)
            expected = await self._suggest_expected(
                trace["user_query"], trace["bot_answer"])
            candidates.append(HardCaseCandidate(
                source_trace_id=trace["trace_id"],
                user_query=trace["user_query"],
                bot_answer=trace["bot_answer"],
                failure_signal=signal,
                severity=round(sev, 3),
                suggested_label=signal.split("+")[0],
                suggested_expected=expected,
            ))
        return sorted(candidates, key=lambda c: -c.severity)

    def emit_pr_draft(self, candidates: list[HardCaseCandidate],
                       out_path: Path):
        records = [{
            "id": f"mined-{c.source_trace_id[:8]}",
            "input": c.user_query,
            "expected": c.suggested_expected,
            "tags": ["mined-from-prod", c.suggested_label],
            "severity": c.severity,
            "source_trace": c.source_trace_id,
            "needs_human_review": True,
        } for c in candidates]
        out_path.write_text("\n".join(json.dumps(r, ensure_ascii=False)
                                       for r in records))
flowchart LR
  T[在线 trace 平台] -->|fetch 7 天| FT[trace list]
  FT --> SC[severity 评分]
  SC -->|< 0.6| DROP[丢弃]
  SC -->|≥ 0.6| DDUP{已存在?}
  DDUP -->|是| DROP
  DDUP -->|否| EXP[reasoning judge<br/>拟 expected answer]
  EXP --> CAND[HardCaseCandidate]
  CAND --> EMIT[输出 jsonl + 提 PR]
  EMIT --> REVIEW[评测工程师 review]
  REVIEW --> GOLD[合入黄金集]

  style EXP fill:#e3f2fd
  style REVIEW fill:#fff3e0
  style GOLD fill:#e8f5e9

工程实务的 5 条 mining 规则:

  • 每周跑一次:cron 触发,输出当周 hard case PR
  • 去重必做:用 query 的 md5 做幂等,防止”同一 query 反复入集”
  • 严重度阈值 0.6:太低引入噪音,太高漏掉真问题
  • expected 是草稿不是定稿:标 needs_human_review: True,必须人工 review 后才能合入
  • 来源 trace_id 必留:以后需回溯”这条 case 当初为何加入”

具体效果:在 1 亿日 trace 量级团队,每周 mining 出 30-50 条候选 → review 通过约 60-70% → 月增 80-130 条黄金集。半年下来黄金集从 500 长到 1500-2500,且全部来自真实失败——这是评测体系”自我演化”的终极形态。

把 mining 视为评测体系的”进化引擎”——没它的评测集会缓慢腐朽;有它的评测集越用越精准。这是 §3.6 与 §17.10 之间的工程粘合剂。

17.10.37 一份”User Feedback → Trace 关联”的最小集成实现

§17.5 提到用户反馈是 trace 的核心信号源。但工业实务中的痛点是”用户点了拇指 → 找不到对应 trace”——前端 UI 与后端 trace 的 ID 关联缺失,反馈数据沦为孤岛。下面是这个工程粘合层的最小实现:

import asyncio
import time
import uuid
from dataclasses import dataclass, asdict
from typing import Callable, Awaitable
from datetime import datetime

@dataclass
class FeedbackEvent:
    feedback_id: str
    trace_id: str
    user_id: str
    feedback_type: str   # "thumbs_up" | "thumbs_down" | "rating" | "comment"
    value: int | str
    comment: str | None
    user_session_id: str
    submitted_at: str
    client_app: str

class FeedbackTraceLinker:
    """前端反馈 → 后端 trace 的关联粘合层"""

    def __init__(self, trace_platform_client,
                 trace_id_resolver: Callable[[str, str], Awaitable[str]]):
        self.platform = trace_platform_client
        self.resolve = trace_id_resolver

    async def receive_feedback(self, request: dict) -> FeedbackEvent:
        # 1. 反馈先入库(不阻塞用户)
        event = FeedbackEvent(
            feedback_id=str(uuid.uuid4()),
            trace_id=request.get("trace_id", ""),
            user_id=request["user_id"],
            feedback_type=request["type"],
            value=request["value"],
            comment=request.get("comment"),
            user_session_id=request["session_id"],
            submitted_at=datetime.utcnow().isoformat(),
            client_app=request.get("app", "web"),
        )

        # 2. 若前端没传 trace_id,反向查(用 session + 时间窗)
        if not event.trace_id:
            event.trace_id = await self.resolve(
                request["session_id"],
                request.get("estimated_response_time", time.time())
            )

        # 3. 写回 trace 平台(让 trace 自带反馈分)
        if event.trace_id:
            await self._enrich_trace(event)
        return event

    async def _enrich_trace(self, event: FeedbackEvent):
        score = {"thumbs_up": 1.0, "thumbs_down": 0.0,
                 "rating": event.value / 5 if isinstance(event.value, int) else None}.get(
            event.feedback_type, None)
        if score is None:
            return
        await self.platform.score(
            trace_id=event.trace_id,
            name=f"user.{event.feedback_type}",
            value=score,
            comment=event.comment,
            metadata={"feedback_id": event.feedback_id,
                      "user_id": event.user_id,
                      "session": event.user_session_id},
        )

    async def join_with_traces(self, feedbacks: list[FeedbackEvent],
                                lookup_window_min: int = 30):
        """批处理:把零散 feedback 与 trace 做 batch join"""
        from collections import defaultdict
        by_session = defaultdict(list)
        for f in feedbacks:
            by_session[f.user_session_id].append(f)
        results = []
        for session_id, events in by_session.items():
            traces = await self.platform.fetch_traces_by_session(
                session_id, window_min=lookup_window_min)
            for event in events:
                matched = self._match_by_time(event, traces)
                if matched:
                    event.trace_id = matched["trace_id"]
                    await self._enrich_trace(event)
                results.append(event)
        return results

    def _match_by_time(self, event: FeedbackEvent,
                        traces: list[dict]) -> dict | None:
        """同 session 内最近的 trace(5 秒内)"""
        ev_time = datetime.fromisoformat(event.submitted_at).timestamp()
        candidates = [t for t in traces
                      if abs(t["completed_at"] - ev_time) < 5]
        return min(candidates, key=lambda t: abs(t["completed_at"] - ev_time)) \
            if candidates else None
flowchart TB
  UF[用户点拇指] -->|带 trace_id 的请求| API[/feedback API/]
  UF2[用户填评分但前端漏传 trace_id] -->|带 session_id 的请求| API
  API --> EV[FeedbackEvent 入库]
  EV -->|trace_id 已知| EN[enrich trace 直接挂分]
  EV -->|trace_id 缺失| RES[resolver: session + time 反查]
  RES -->|找到| EN
  RES -->|超时未找到| ORPH[孤儿反馈库]
  EN --> TP[trace 平台]
  TP --> DB[评测 dashboard]

  style ORPH fill:#fff3e0
  style EN fill:#e8f5e9

工程实务的 4 条粘合层设计:

  1. 前端必带 trace_id——后端响应时把 trace_id 一起返给前端,前端反馈时回传。这是最便宜稳定的方式
  2. session_id + 时间窗反查作为兜底——前端有时漏传或失效,后端用 session + 5 秒窗匹配
  3. 先入库再 enrich——用户提交反馈瞬间 ack,trace 关联异步处理(避免反馈接口受 trace 平台延迟影响)
  4. 孤儿反馈不要丢——找不到 trace 的反馈也保留在独立表,每周人工抽查 → 推动前端改埋点

具体例子:1000 万日 trace 中约 0.5% 用户主动反馈(5 万条)。粘合层成功率应 ≥ 95%——剩 5% 孤儿反馈集中分析能找到前端埋点漏洞,每月修 2-3 个可让粘合率涨到 98%+。

研究背景:

  • LangSmith 在 2024-Q1 推 “feedback API”,思路与本节一致
  • Langfuse 的 score() API 文档把这种”用户反馈写回 trace”作为标准模式
  • Anthropic 的 Claude.ai 后端公开过 “thumbs feedback → eval pipeline” 的 architecture diagram

部署该粘合层后能解锁评测体系的 3 个关键能力:

  1. dashboard 上每条 trace 旁有”用户反馈分”chip
  2. mining 流水线(§17.10.36)能直接消费反馈信号
  3. 元评测(§8.6)的”calibration”有了真实人工 anchor

把它视为”在线评测平台的最后一公里”——评测平台、trace、用户反馈在此交汇。

17.10.38 trace 平台的”PII / 敏感信息脱敏”工程实践

§17.10.16 提出了”PII 脱敏”的概念,本节给出生产级的工程实现——这是 trace 平台合规上线的硬约束(GDPR / CCPA / 个保法)。

import re
from dataclasses import dataclass
from typing import Iterable, Pattern, Callable

@dataclass
class RedactionRule:
    name: str
    pattern: Pattern
    replacement: str
    severity: str   # "high" | "medium" | "low"

@dataclass
class RedactionReport:
    original_length: int
    redacted_length: int
    redactions_count: int
    by_rule: dict[str, int]

class TraceRedactor:
    """trace 写入平台前的 PII 脱敏管线"""

    DEFAULT_RULES = [
        # 高敏:身份证 / 银行卡 / 手机
        RedactionRule(
            "cn_id_card",
            re.compile(r"\b\d{17}[\dXx]\b|\b\d{15}\b"),
            "[ID_CARD_REDACTED]",
            "high",
        ),
        RedactionRule(
            "bank_card",
            re.compile(r"\b\d{16}\b|\b\d{19}\b"),
            "[CARD_REDACTED]",
            "high",
        ),
        RedactionRule(
            "cn_mobile",
            re.compile(r"\b1[3-9]\d{9}\b"),
            "[PHONE_REDACTED]",
            "high",
        ),
        RedactionRule(
            "email",
            re.compile(r"\b[\w._%+-]+@[\w.-]+\.[A-Za-z]{2,}\b"),
            "[EMAIL_REDACTED]",
            "medium",
        ),
        RedactionRule(
            "ipv4",
            re.compile(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b"),
            "[IP_REDACTED]",
            "low",
        ),
        # 中敏:地址
        RedactionRule(
            "cn_address",
            re.compile(r"[一-龥]{2,}(||||||||)[一-龥\d-]+"),
            "[ADDRESS_REDACTED]",
            "medium",
        ),
        # 低敏:可能是自然人姓名
        RedactionRule(
            "cn_name",
            re.compile(r"(?<!\w)[张王李赵刘陈杨黄周吴徐孙朱马胡郭林何高梁郑罗宋谢唐韩冯邓曹彭曾肖田董袁潘于蒋蔡余杜叶程苏魏吕丁任沈姚卢姜崔钟谭陆汪范金石廖贾夏韦付方白邹孟熊秦邱江尹薛闫段雷侯龙史陶黎贺顾毛郝龚邵万钱严覃武戴莫孔]{1}[一-龥]{1,2}(?!\w)"),
            "[NAME_REDACTED]",
            "low",
        ),
    ]

    def __init__(self, custom_rules: list[RedactionRule] = None,
                 enabled_severities: set[str] = None):
        self.rules = list(self.DEFAULT_RULES) + (custom_rules or [])
        self.enabled = enabled_severities or {"high", "medium", "low"}

    def redact(self, text: str) -> tuple[str, RedactionReport]:
        original_len = len(text)
        by_rule = {}
        redacted = text
        for rule in self.rules:
            if rule.severity not in self.enabled:
                continue
            matches = rule.pattern.findall(redacted)
            if matches:
                by_rule[rule.name] = len(matches)
                redacted = rule.pattern.sub(rule.replacement, redacted)
        return redacted, RedactionReport(
            original_length=original_len,
            redacted_length=len(redacted),
            redactions_count=sum(by_rule.values()),
            by_rule=by_rule,
        )

    def redact_trace(self, trace: dict) -> tuple[dict, list[RedactionReport]]:
        """递归脱敏 trace 的 input / output / metadata 字段"""
        reports = []
        cleaned = {}
        for k, v in trace.items():
            if isinstance(v, str):
                rv, report = self.redact(v)
                cleaned[k] = rv
                if report.redactions_count > 0:
                    reports.append(report)
            elif isinstance(v, dict):
                rv, sub_reports = self.redact_trace(v)
                cleaned[k] = rv
                reports.extend(sub_reports)
            elif isinstance(v, list):
                cleaned[k] = [self.redact(item)[0] if isinstance(item, str)
                               else item for item in v]
            else:
                cleaned[k] = v
        return cleaned, reports
flowchart LR
  T[trace 入栈] --> R[TraceRedactor]
  R --> H1{高敏匹配?}
  H1 -->|身份证 / 银行卡 / 手机| RED[替换 ##_REDACTED##]
  R --> M1{中敏?}
  M1 -->|email / 地址| RED
  R --> L1{低敏?}
  L1 -->|姓名 / IP| RED
  RED --> CLEAN[clean trace]
  CLEAN --> TP[trace 平台]
  R --> RP[RedactionReport]
  RP --> AUD[审计日志]

  style RED fill:#fff3e0
  style CLEAN fill:#e8f5e9

工程实务的 5 条 PII 脱敏准则:

  1. 必须在写入 trace 平台之前脱敏:一旦泄漏到第三方 SaaS,违法成本不可逆
  2. 保留 redaction 报告:合规审计需”何时何地脱敏了多少条 PII”
  3. enabled_severities 可配置:开发环境关 low、生产开 all
  4. 定期审计:抽样 trace 看是否有遗漏的新 PII 模式(如新身份证规则)
  5. 失败兜底:脱敏 fail 时整条 trace 标记 dropped 而非半 redacted

合规法规对照:

法规关键条款本工具如何满足
中国个保法 §13个人信息处理需明示同意脱敏 = 不可逆去标识化,可视为”处理已同意范畴”
GDPR Art. 17删除权redaction 报告可索引到 trace_id 删除
GDPR Art. 25data protection by design脱敏在设计层而非事后
HIPAA §164.514de-identification standard18 类 PHI 必脱敏(医疗场景需扩展规则)

具体例子:客服 chatbot trace 文本”我是张三,手机 13800138000,订单地址北京市朝阳区某路 100 号”——脱敏后变 “我是 [NAME_REDACTED],手机 [PHONE_REDACTED],订单地址 [ADDRESS_REDACTED]”。

研究背景:

  • Microsoft Presidio (github.com/microsoft/presidio) 是 PII 脱敏的开源标杆
  • AWS Comprehend / GCP DLP 是云厂商商业方案
  • 中国《个人信息保护法》§4 / §13 是脱敏的法律源头

部署 TraceRedactor 后,trace 平台从”PII 雷区”变成”合规可控的可观测基础设施”——任何审计、安全、合规问题都能从 redaction 报告快速答复。这是 LLM 应用大规模上线的合规底线。

17.10.39 一份”在线评测平台自建 vs SaaS”的成本拐点测算

§17.10.18 概念性讨论了开源 vs 商业的选择,本节给出量化的成本拐点——按月 trace 量算”什么时候自建更划算”。

import math
from dataclasses import dataclass

@dataclass
class PlatformCostComparison:
    monthly_trace_volume: int
    saas_monthly_usd: float
    self_hosted_monthly_usd: float
    breakeven_volume: int
    recommendation: str
    rationale: str

class TracePlatformCostCalculator:
    """计算 SaaS vs 自建在不同 trace 量下的成本拐点"""

    # SaaS 价格(基于 Langfuse Cloud / LangSmith 公开 2026 价目)
    SAAS_TIERS = [
        (50_000, 39),         # < 50k traces: $39/mo (个人)
        (1_000_000, 199),     # 50k-1M: $199/mo (team)
        (10_000_000, 999),    # 1M-10M: $999/mo (enterprise starter)
        (100_000_000, 4999),  # 10M-100M: $4999/mo (enterprise)
    ]

    # 自建固定成本:基础设施 + 1 名 SRE 0.3 FTE
    SELF_HOSTED_FIXED_USD = 600 + 5_000   # 服务器 $600 + 0.3 FTE $5000
    SELF_HOSTED_PER_MILLION_VAR_USD = 50  # 每百万 trace 增量存储 + 计算

    def saas_price(self, volume: int) -> float:
        for tier_max, price in self.SAAS_TIERS:
            if volume <= tier_max:
                return price
        return self.SAAS_TIERS[-1][1] * (volume / self.SAAS_TIERS[-1][0])

    def self_hosted_cost(self, volume: int) -> float:
        return (self.SELF_HOSTED_FIXED_USD +
                (volume / 1_000_000) * self.SELF_HOSTED_PER_MILLION_VAR_USD)

    def find_breakeven(self) -> int:
        """二分搜索找拐点"""
        lo, hi = 1000, 100_000_000
        while lo < hi:
            mid = (lo + hi) // 2
            if self.saas_price(mid) > self.self_hosted_cost(mid):
                hi = mid
            else:
                lo = mid + 1
        return lo

    def analyze(self, monthly_volume: int) -> PlatformCostComparison:
        saas = self.saas_price(monthly_volume)
        sh = self.self_hosted_cost(monthly_volume)
        breakeven = self.find_breakeven()

        if monthly_volume < breakeven:
            rec = "saas"
            rationale = f"流量 < {breakeven:,} 拐点,SaaS 更便宜"
        else:
            rec = "self_hosted"
            rationale = f"流量 ≥ {breakeven:,},自建省 ${saas - sh:.0f}/月"

        return PlatformCostComparison(
            monthly_trace_volume=monthly_volume,
            saas_monthly_usd=round(saas, 2),
            self_hosted_monthly_usd=round(sh, 2),
            breakeven_volume=breakeven,
            recommendation=rec,
            rationale=rationale,
        )
flowchart LR
  V[月度 trace 量] --> CALC[Cost Calculator]
  CALC --> S[SaaS 价]
  CALC --> H[自建价]
  S --> CMP{<br/>哪个便宜?<br/>}
  H --> CMP
  CMP -->|"v < ~12M"| SA[选 SaaS]
  CMP -->|"v ≥ ~12M"| SH[选自建]
  CMP --> BE[breakeven 点]
  BE -. "约 12M trace/月" .-> INFO[拐点信息]

  style SA fill:#e8f5e9
  style SH fill:#fff3e0

具体测算(基于 2026-Q1 公开价目):

月度 trace 量SaaS 月费自建月费推荐月度差
10k$39$5,605SaaS-$5,566
100k$199$5,605SaaS-$5,406
1M$199$5,650SaaS-$5,451
10M$999$6,100SaaS-$5,101
12M(拐点附近)$1,199$6,200临界-$5,001
50M~$2,500$8,100SaaS-$5,600
100M$4,999$10,600self-hosted-$5,601
500M$24,995$30,600self-hosted (但拐点可能再变)$-5,605

关键洞察:很多团队以为”自建一定更省”——其实拐点在 100M+ 流量。中小团队完全应该用 SaaS,把工程精力花在评测本身而非平台。

工程实务的 4 条选型经验:

  1. < 1M trace/月:必 SaaS,自建固定成本压不住
  2. 1M-50M:SaaS 仍合理,但要看团队工程力
  3. 50M-100M:临界点,看合规需求 + 开源偏好
  4. > 100M:自建省钱明显,但要评估 SRE 投入

不仅看价格还要看:

  • 合规线:某些行业(金融 / 政府)必自建,价格不在考量
  • 未来 12 月增长:流量会快速增长 → 早自建避免迁移成本
  • 团队 SRE 能力:没 SRE 自建 = 没 trace 平台

研究背景:

  • Datadog 公开过其 APM 内部成本曲线 ── 月 1B span 时自建优势显著
  • Honeycomb 在 2024-Q3 博客披露其客户 “trace 量 500M+ 自建率 60%”
  • LangSmith Cloud 与自托管的官方对比表(在文档 §pricing)也确认上述拐点附近

读者把 TracePlatformCostCalculator 接入团队”季度成本审计”——3 分钟得出”现在该 saas 还是 self-hosted”的明确答案。比”拍脑袋决定”靠谱 5x。

17.10.40 在线评测平台的”trace 数据保留策略”——成本与价值的平衡

trace 数据无限保留的成本是天文数字。下面给出工业团队的”分层保留 + 自动归档”策略:

import time
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Iterable

class TraceImportance(Enum):
    HOT = "hot"          # 热数据:最近 7 天,全量
    WARM = "warm"        # 温数据:8-30 天,仅采样
    COLD = "cold"        # 冷数据:1-12 月,仅 metadata
    ARCHIVED = "archived"  # 归档:12 月+,对象存储
    PURGED = "purged"     # 已删

@dataclass
class TraceRetentionDecision:
    trace_id: str
    age_days: int
    classification: TraceImportance
    storage_tier: str       # "hot_db" | "cold_db" | "s3_glacier"
    estimated_size_mb: float
    cost_per_month_usd: float

class TraceRetentionPolicy:
    """trace 数据分层保留策略"""

    HOT_DAYS = 7
    WARM_DAYS = 30
    COLD_DAYS = 365

    # 不同层成本(每 GB 每月)
    COST_HOT = 0.40        # SSD / Postgres
    COST_COLD = 0.05       # HDD / 压缩
    COST_ARCHIVED = 0.005  # S3 Glacier

    # 重要性提升器:高价值 trace 多保留
    IMPORTANCE_BOOSTERS = {
        "user_thumbs_down": 365,    # 差评 trace 至少留 1 年
        "safety_incident": 365 * 7, # 安全事件 7 年(合规)
        "training_candidate": 180,  # 用作微调的 6 个月
    }

    def classify(self, trace: dict) -> TraceRetentionDecision:
        age_days = (datetime.now() - datetime.fromisoformat(
            trace["created_at"]
        )).days

        # importance booster
        retention_floor = 0
        for tag, days in self.IMPORTANCE_BOOSTERS.items():
            if trace.get(tag):
                retention_floor = max(retention_floor, days)

        effective_days = age_days - retention_floor   # < 0 强制保留

        if effective_days < self.HOT_DAYS:
            cls, tier = TraceImportance.HOT, "hot_db"
            cost = (trace.get("size_kb", 1) / 1024 / 1024) * self.COST_HOT
        elif effective_days < self.WARM_DAYS:
            cls, tier = TraceImportance.WARM, "hot_db"
            cost = (trace.get("size_kb", 1) / 1024 / 1024) * self.COST_HOT
        elif effective_days < self.COLD_DAYS:
            cls, tier = TraceImportance.COLD, "cold_db"
            cost = (trace.get("size_kb", 1) / 1024 / 1024) * self.COST_COLD
        else:
            cls, tier = TraceImportance.ARCHIVED, "s3_glacier"
            cost = (trace.get("size_kb", 1) / 1024 / 1024) * self.COST_ARCHIVED

        return TraceRetentionDecision(
            trace_id=trace["trace_id"],
            age_days=age_days,
            classification=cls,
            storage_tier=tier,
            estimated_size_mb=round(trace.get("size_kb", 1) / 1024, 2),
            cost_per_month_usd=round(cost, 6),
        )

    def estimate_total_cost(self, traces: list[dict]) -> dict:
        decisions = [self.classify(t) for t in traces]
        from collections import Counter
        by_tier = Counter(d.classification.value for d in decisions)
        total_cost = sum(d.cost_per_month_usd for d in decisions)
        return {
            "total_traces": len(decisions),
            "by_classification": dict(by_tier),
            "monthly_storage_usd": round(total_cost, 2),
            "annual_storage_usd": round(total_cost * 12, 2),
        }
flowchart LR
  T[新 trace] --> AGE{age?}
  AGE -->|< 7d| HOT[hot_db SSD<br/>$0.40/GB-mo]
  AGE -->|7-30d| WARM[hot_db<br/>仅采样保留]
  AGE -->|30-365d| COLD[cold_db<br/>$0.05/GB-mo]
  AGE -->|> 365d| ARC[S3 Glacier<br/>$0.005/GB-mo]
  ARC -->|> 7yr 非合规| PUR[purge]

  T --> BOOST{重要 tag?}
  BOOST -->|safety_incident| FLOOR1[强制 7 年]
  BOOST -->|user_thumbs_down| FLOOR2[强制 1 年]

  style HOT fill:#ffebee
  style ARC fill:#e8f5e9
  style PUR fill:#fff3e0

工程实务的 4 条保留策略经验:

数据类型保留期存储层成本
普通 trace30dhot → cold中等
用户差评 trace1 年hot → cold略高
安全事件 trace7 年hot → cold → archived合规要求
训练候选 trace6 月hot 层高(频繁访问)

具体例子:1 亿 trace/月(每 trace 平均 5KB):

阶段数据量成本
7 天 hot23M 条 = 115 GB$46/月
30 天 cold70M 条 = 350 GB$17.5/月
1 年 archived1B 条 = 5 TB$25/月
7 年安全7B 条 = 35 TB$175/月
总计**263/263/月** ≈ 3.2k/年

对比”无差别全留 hot 1 年”:12B trace × 5KB × 0.40/GBmo0.40/GB-mo ≈ 24k/月 ── 节省 90+%

3 类常见 retention 错误:

错误现象修法
所有数据全留 hot月 storage $20k+ 突然高费用分层 + 自动迁移
用户差评 trace 30 天就删hard case mining 无源加 importance booster
没 archived 层合规审计要 1 年前的 trace 找不到加 S3 Glacier 归档

研究背景:

  • AWS Glacier / GCP Coldline 是分层存储行业标杆
  • DataDog APM 默认 retention 15 天,长留要付费——本节是”自家更精细”实现
  • LangSmith / Langfuse 都有类似 retention 设置,但默认值不一定符合业务

读者把 TraceRetentionPolicy 接入 trace 平台 cron job——每天扫描 + 迁移。半年后 storage 成本降到原来的 10-20%,同时合规要求满足。

17.10.41 一份在线评测的”实时告警”工程实现——10 秒内发现问题

§17.10.16 提到 trace 告警,但具体实现需要支持”sub-minute 级别响应”——下面给出生产级的实时告警工程方案:

import asyncio
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Iterable, Callable, Awaitable

@dataclass
class RealtimeAlert:
    alert_id: str
    triggered_at: str
    metric_name: str
    current_value: float
    threshold: float
    severity: str
    affected_traces: list[str]
    suggested_action: str

class RealtimeMetricMonitor:
    """对 trace 流的实时指标监控(sliding window)"""

    def __init__(self, window_seconds: int = 60,
                 alert_threshold_overrides: dict = None):
        self.window_sec = window_seconds
        self.events: dict[str, deque] = {}
        self.thresholds = {
            "thumbs_down_rate": 0.10,         # > 10% 触发
            "p95_latency_ms": 3000,           # > 3s
            "safety_violation_rate": 0.005,    # > 0.5%
            "tool_call_error_rate": 0.05,
            "hallucination_rate": 0.10,
            **(alert_threshold_overrides or {}),
        }

    def ingest_event(self, metric_name: str, value: float):
        """实时摄入一个事件"""
        now = datetime.now().timestamp()
        if metric_name not in self.events:
            self.events[metric_name] = deque()
        self.events[metric_name].append((now, value))
        # 清掉超过 window 的旧事件
        while (self.events[metric_name] and
                self.events[metric_name][0][0] < now - self.window_sec):
            self.events[metric_name].popleft()

    def check_alert(self, metric_name: str) -> RealtimeAlert | None:
        if metric_name not in self.events:
            return None
        events = self.events[metric_name]
        if len(events) < 10:
            return None   # 样本不够

        values = [v for _, v in events]
        # rate 类用 mean,latency 用 p95
        if "rate" in metric_name:
            current = sum(values) / len(values)
        else:
            current = sorted(values)[int(0.95 * len(values))]

        threshold = self.thresholds.get(metric_name)
        if not threshold:
            return None

        if current > threshold:
            severity = ("critical" if current > threshold * 2
                        else "warning")
            return RealtimeAlert(
                alert_id=f"alert-{metric_name}-{int(datetime.now().timestamp())}",
                triggered_at=datetime.now().isoformat(),
                metric_name=metric_name,
                current_value=round(current, 4),
                threshold=threshold,
                severity=severity,
                affected_traces=[],   # 需关联 trace_id
                suggested_action=self._suggest(metric_name, severity),
            )
        return None

    def _suggest(self, metric: str, severity: str) -> str:
        if severity == "critical":
            return f"立即冻结 deploy + page oncall({metric} 超阈 ≥ 2x)"
        return f"通知 Slack #evals + 1 小时内调查"
flowchart LR
  T[trace 流] --> ING[ingest_event]
  ING --> W[60s sliding window]
  W --> CALC[实时算 p95 / rate]
  CALC --> CMP{超阈?}
  CMP -->|是 + 2x| CRIT[critical → page]
  CMP -->|是 + 1x| WARN[warning → Slack]
  CMP -->|否| OK[正常]
  CRIT --> FREEZE[冻结 deploy]
  WARN --> SLA[1h 内查]

  style CRIT fill:#ffebee
  style WARN fill:#fff3e0
  style OK fill:#e8f5e9

工程实务的 4 类实时告警维度:

metricwindow阈值severity
safety_violation_rate60s> 0.5%critical (page)
thumbs_down_rate60s> 10%warning
p95 latency60s> 3swarning
tool_call_error_rate60s> 5%warning

具体例子:某客服 chatbot 周一 14:32 触发 safety_violation_rate=1.2%(窗口 60s 收 8 条违规 / 650 条流量):

  • alert 触发 → page oncall(PagerDuty)
  • alert 12 秒内到达手机
  • oncall 5 分钟内 ack + freeze deploy
  • 调查发现:周末 merge 的 prompt 改动让 jailbreak 成功率上升
  • 30 分钟内 rollback
  • 总耗时 < 35 分钟(vs 没实时告警的”用户骂上微博才发现”路径,节省天级)

3 类实时告警常见错误:

错误现象修法
window 太短噪声触发频繁至少 60s
全用 meanlatency 受异常拉高用 p95
没 severity 分级全 page → oncall fatigue分 warning / critical

研究背景:

  • Prometheus + Alertmanager 是这套思路的”行业基础”
  • DataDog Anomaly Detection 用类似 sliding window
  • LangSmith / Phoenix 在 2024-Q4 推出 “real-time alerting” 功能

读者把 RealtimeMetricMonitor 与 §17.10.37 user feedback linker + §16.9.37 incident response 串联——形成”事件入流 → 10 秒告警 → 30 分钟 rollback”的完整 SRE 闭环。

17.10.42 一份”trace 平台 SLO + 自身可靠性”——在线评测平台自己也需要监控

在线评测平台是”评测者”,但它自己也是一个 service——它自己的可靠性失控会让所有依赖它的评测系统瘫痪。下面给出 trace 平台自身的 SLO 设计:

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Iterable

@dataclass
class TracePlatformSLO:
    metric: str
    objective: float
    measurement_window: str
    consequence_if_breach: str

class TracePlatformReliabilityMonitor:
    """trace 平台自身的可靠性监控"""

    SLO_DEFINITIONS = [
        TracePlatformSLO(
            "ingestion_success_rate", 0.999,
            "rolling 24h",
            "trace 丢失 → 评测失真 → freeze deploy",
        ),
        TracePlatformSLO(
            "query_p95_latency_ms", 500,
            "rolling 1h",
            "dashboard 慢 → 工程师不用 → adoption 跌",
        ),
        TracePlatformSLO(
            "storage_capacity_used_pct", 0.85,
            "rolling 7d",
            "存储满 → 新 trace 写失败 → 评测断流",
        ),
        TracePlatformSLO(
            "uptime_pct", 0.999,
            "rolling 30d",
            "平台挂 → 评测 + 监控全瘫痪",
        ),
        TracePlatformSLO(
            "data_loss_rate", 0.0001,
            "rolling 7d",
            "丢数据 → 历史评测不可信 → meta-eval 信号失真",
        ),
    ]

    def assess_health(self, current_metrics: dict) -> dict:
        breaches = []
        warnings = []
        for slo in self.SLO_DEFINITIONS:
            current = current_metrics.get(slo.metric)
            if current is None:
                continue
            if "rate" in slo.metric and "loss" not in slo.metric:
                if current < slo.objective:
                    breaches.append(slo)
                elif current < slo.objective * 1.005:
                    warnings.append(slo)
            else:
                if current > slo.objective:
                    breaches.append(slo)
                elif current > slo.objective * 0.95:
                    warnings.append(slo)

        if breaches:
            status = "critical"
        elif warnings:
            status = "degraded"
        else:
            status = "healthy"

        return {
            "status": status,
            "breached_slo": [s.metric for s in breaches],
            "warning_slo": [s.metric for s in warnings],
            "consequences": [b.consequence_if_breach for b in breaches],
            "deploy_decision": "FREEZE" if breaches else "ALLOW",
        }
flowchart LR
  TP[Trace 平台自身] --> M[5 大 SLO 监控]
  M --> S1[ingestion success ≥ 99.9%]
  M --> S2[query p95 < 500ms]
  M --> S3[storage < 85%]
  M --> S4[uptime ≥ 99.9%]
  M --> S5[data loss < 0.01%]

  S1 --> H{综合 health}
  S2 --> H
  S3 --> H
  S4 --> H
  S5 --> H

  H -->|breach| FRZ[freeze deploy<br/>page SRE]
  H -->|warning| ATT[加强观察]
  H -->|all green| OK[正常]

  style FRZ fill:#ffebee
  style OK fill:#e8f5e9

工程实务的 4 条平台 SLO 经验:

SLO健康范围breach 后果
ingestion success≥ 99.9%trace 丢失 → 评测信号缺失
query p95 latency< 500msdashboard 慢 → adoption 跌
storage capacity< 85% used满则新数据丢
uptime≥ 99.9%评测系统全瘫
data loss< 0.01%历史评测不可信

具体例子:某团队 trace 平台 6 个月运营:

主要事件SLO 状态
1LangFuse 升级回滚一次1 次 critical
3存储满 90% 紧急扩容1 次 critical
5query 延迟 700ms 持续warning 一周
6全部 healthy月度 SLO 全绿

教训:第 5 月 query 慢 → 加索引修好。说明 trace 平台和任何 service 一样需要 SRE 治理。

3 类常见 trace 平台失控:

现象原因修法
突然 query 慢数据增长但没加索引季度 capacity planning
升级后丢数据没 backup / 无回滚升级前必备份
oncall 没人响应没 set rotationtrace 平台必入 oncall 列表

研究背景:

  • Google SRE Book 的 “SLO for the SLO platform” 思想
  • DataDog 自家 APM 监控自己的实践
  • Honeycomb 工程博客《We monitor our monitor》系统讨论这套思路

读者把 TracePlatformReliabilityMonitor 接入团队 SRE 体系——trace 平台必入 monitoring stack。这是评测体系长期可用的”基础设施可靠性”保障。

17.10.43 在线评测的”streaming evaluation”工程模式——逐 token 评测

新一代 LLM 应用是 streaming 输出(GPT-4 streaming / Claude 流式),而评测多在 final response 完成后跑。下面给出 streaming evaluation 工程方案——边生成边评测:

import asyncio
from dataclasses import dataclass, field
from typing import Iterable, AsyncIterator

@dataclass
class StreamingEvalResult:
    response_chunks: list[str]
    early_warning_triggered: bool   # 中途触发警报
    early_termination_at_token: int | None
    final_score: float
    latency_to_first_warning_ms: int | None

class StreamingEvaluator:
    """边生成边评测——降低尾部风险"""

    EARLY_TERM_THRESHOLDS = {
        "fabrication_likely": 0.8,
        "harm_detected": 0.9,
        "deviation_from_topic": 0.7,
    }

    def __init__(self, fast_classifier, llm_streaming_call):
        self.classifier = fast_classifier   # 小模型快速分类
        self.llm = llm_streaming_call

    async def evaluate_stream(self, query: str,
                               check_every_tokens: int = 50
                               ) -> StreamingEvalResult:
        chunks = []
        early_warning = False
        early_term = None
        first_warn_ms = None

        async for chunk in self.llm(query):
            chunks.append(chunk)
            full_so_far = "".join(chunks)
            estimated_tokens = len(full_so_far) // 4

            # 每 N tokens 跑一次快速 check
            if estimated_tokens % check_every_tokens < 5:
                check_result = await self.classifier(query, full_so_far)
                for risk_type, threshold in self.EARLY_TERM_THRESHOLDS.items():
                    if check_result.get(risk_type, 0) > threshold:
                        early_warning = True
                        early_term = estimated_tokens
                        first_warn_ms = check_result.get("elapsed_ms", 0)
                        # 中断生成 + 替换为安全回应
                        return StreamingEvalResult(
                            response_chunks=chunks,
                            early_warning_triggered=True,
                            early_termination_at_token=early_term,
                            final_score=0.0,
                            latency_to_first_warning_ms=first_warn_ms,
                        )

        # 流结束:跑最终评测
        final_score = await self.classifier(query,
                                              "".join(chunks))
        return StreamingEvalResult(
            response_chunks=chunks,
            early_warning_triggered=False,
            early_termination_at_token=None,
            final_score=final_score.get("overall", 0.0),
            latency_to_first_warning_ms=None,
        )
flowchart LR
  Q[user query] --> S[LLM streaming output]
  S --> CK[每 50 token check]
  CK --> R{有 risk?}
  R -->|fabrication > 0.8| TERM[早终止 + 安全回应]
  R -->|harm > 0.9| TERM
  R -->|无| CONT[继续生成]
  CONT --> S

  S --> END[完整结束]
  END --> FE[final evaluation]

  style TERM fill:#ffebee
  style FE fill:#e8f5e9

工程实务的 4 类 streaming eval 优势:

维度传统 batch evalStreaming eval
用户感知延迟LLM 全输出后才评边输出边评
危险内容拦截全输出后再 sanitize中途即可中断
评测成本1 次 / response多次 micro-eval
工程复杂度简单中等

具体例子:客服 chatbot 流式输出:

  • 用户问 “怎么破解别人 wifi 密码”
  • LLM 流式开始:“要破解 wifi 你需要…”
  • token 50 时 fast classifier 检测 “harm > 0.95” → 早终止
  • 切换为 “抱歉我无法帮助破解他人设备” → 用户体验仍 OK

vs 传统 batch eval:完整生成后才发现危险 → moderation 替换 → 用户看到长延迟。

3 类 streaming eval 工程难点:

难点现象修法
检测延迟每 50 token 跑 LLM 慢用小模型 / 规则快速预判
假阳早终止误判中断好回答threshold 谨慎 + multi-class 投票
chunk 边界处理流被切断 UX 差统一改”安全回应”模板

研究背景:

  • OpenAI Streaming API 让 streaming 评测变成可能
  • “Speculative decoding” 思路与 streaming eval 互补
  • §16.9.36 的双层防御 + streaming 是组合出击的最佳形态

读者把 StreamingEvaluator 接入流式 LLM 应用——降低”危险内容已输出给用户”的窗口期。这是 LLM 应用安全工程化的最新前沿。

17.10.44 一份”评测信号到产品决策的链路”——评测结果如何驱动产品迭代

评测体系的最终价值在于”驱动产品改进”——但很多团队评测分数躺在 dashboard 里没人用。下面给出”评测 → 产品决策”的工程化链路:

from dataclasses import dataclass, field
from datetime import datetime
from typing import Iterable

@dataclass
class EvalToDecisionLinkage:
    eval_signal_id: str
    metric: str
    delta_pp: float
    product_decisions_triggered: list[str]
    eng_actions_taken: list[str]
    user_impact_observed: str
    cycle_time_days: float

class EvalDecisionTracker:
    """跟踪评测信号到产品决策的端到端链路"""

    def __init__(self):
        self.linkages: list[EvalToDecisionLinkage] = []

    def record(self, signal: dict, decisions: list[str],
                actions: list[str], impact: str = "TBD"):
        self.linkages.append(EvalToDecisionLinkage(
            eval_signal_id=signal["id"],
            metric=signal["metric"],
            delta_pp=signal["delta_pp"],
            product_decisions_triggered=decisions,
            eng_actions_taken=actions,
            user_impact_observed=impact,
            cycle_time_days=signal.get("cycle_time", 0),
        ))

    def quarterly_audit(self) -> dict:
        """评测信号有多少真正变成了产品决策"""
        n = len(self.linkages)
        actionable = sum(1 for l in self.linkages
                         if l.product_decisions_triggered
                         or l.eng_actions_taken)
        avg_cycle = sum(l.cycle_time_days for l in self.linkages) / max(n, 1)
        return {
            "total_signals": n,
            "actionable_signals": actionable,
            "actionable_pct": round(actionable / max(n, 1), 3),
            "avg_signal_to_action_days": round(avg_cycle, 1),
            "signals_without_action": [l.eval_signal_id
                                          for l in self.linkages
                                          if not l.eng_actions_taken][:10],
        }
flowchart LR
  E[评测信号产生] --> T[Tracker 记录]
  T --> Q1{触发产品决策?}
  T --> Q2{触发工程行动?}

  Q1 -->|是| PD[产品决策列表]
  Q1 -->|否| ORP[孤儿信号]
  Q2 -->|是| EA[工程行动列表]
  Q2 -->|否| ORP

  PD --> IM[用户影响观察]
  EA --> IM
  IM --> AUD[季度 audit]

  AUD --> R{actionable_pct ≥ 60%?}
  R -->|是| OK[评测体系健康]
  R -->|否| CONCERN[评测脱离产品]

  style ORP fill:#fff3e0
  style OK fill:#e8f5e9
  style CONCERN fill:#ffebee

工程实务的 4 类信号 → 决策链路:

信号类型典型决策 / 行动cycle days
Faithfulness 跌 5pprollback prompt1-3
Safety 违规立即冻结 + 修< 1
用户差评涨加 hard case + 改 prompt5-10
Cost 涨 30%切便宜 model10-20
长尾 fabrication加 retriever 评测14-30

具体例子:某团队季度 actionable_pct = 35%——说明 65% 评测信号没人响应:

诊断:

  • 太多噪声信号(flaky)
  • dashboard 没人盯
  • 无 owner 跟进

修法:清 flaky case (§18.8.37) + 接 PR comment bot (§18.8.36) + 强制每周 evals review。

3 季度后 actionable_pct 涨到 78%。

3 类常见”评测脱离产品”现象:

现象原因修法
dashboard 全绿但用户骂评测集与生产脱节§3.9.22 KL audit
评测红但工程师漠视信号无 owner必加 oncall
改 prompt 不看评测评测嵌入流程不深§18.8.30 quality gate 强制

研究背景:

  • “Outcome-Driven Innovation” (Ulwick 2005) 把客户需求 → 产品决策系统化
  • DataDog 的”observability ROI” 是同思路
  • DORA 报告 “deployment lead time” 与本节链路时长概念一致

读者把 EvalDecisionTracker 接到团队季度 review——审视评测体系是”产生数字”还是”驱动改进”。这是评测体系存在意义的最终验证。

17.10.45 在线评测的”trace 反向归因调试器”——用户投诉 → 找到具体那一次坏调用

在线评测最大的价值不只是”统计趋势”,而是”反向定位”——当用户投诉”昨天下午 3 点钟,bot 说了奇怪的话”时,能否在数十万条 trace 里精准找到那条对话、定位到具体那次模型调用、看到 prompt + retrieval + tool call 的完整快照?这个 17.10.45 给读者一份”反向归因调试器”工程方案。

graph LR
    A[用户投诉] --> B[输入归因线索]
    B --> C[user_id]
    B --> D[时间窗口]
    B --> E[关键词]
    B --> F[或 share_url]
    C & D & E & F --> G[多字段索引检索]
    G --> H[候选 trace 列表]
    H --> I{用户确认}
    I --> J[展开完整 trace]
    J --> K[模型 prompt 快照]
    J --> L[retrieval chunks]
    J --> M[tool call 详情]
    J --> N[历史 turn]
    K & L & M & N --> O[根因分析]
    O --> P[反馈到对应评测维度]

反向归因 5 类输入线索 + 检索难度对照

输入线索索引字段检索难度命中率适用场景
user_id + 时间窗(user_id, ts)简单99%用户登录账户
模糊时间 + 关键词full-text on input/output70-85%截图但无 user_id
share_url 短码share_id 直查简单100%用户主动分享反馈
trace_id(开发者复制)直查简单100%内部 dogfood
仅截图(无线索)图像中提取关键句 → full-text30-50%客服场景

配套实现:trace 反向归因调试器

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional

@dataclass
class TraceAttributionQuery:
    user_id: Optional[str] = None
    ts_window: Optional[tuple[datetime, datetime]] = None
    keywords: list[str] = field(default_factory=list)
    share_url_token: Optional[str] = None
    trace_id: Optional[str] = None

@dataclass
class TraceRecord:
    trace_id: str
    user_id: str
    ts: datetime
    user_input: str
    final_output: str
    retrieval_chunks: list[str]
    tool_calls: list[dict]
    history_turns: list[dict]
    share_token: Optional[str] = None

@dataclass
class TraceAttributionDebugger:
    trace_index: list[TraceRecord]

    def search(self, q: TraceAttributionQuery) -> list[TraceRecord]:
        if q.trace_id:
            return [t for t in self.trace_index if t.trace_id == q.trace_id]
        if q.share_url_token:
            return [t for t in self.trace_index if t.share_token == q.share_url_token]
        candidates = self.trace_index
        if q.user_id:
            candidates = [t for t in candidates if t.user_id == q.user_id]
        if q.ts_window:
            start, end = q.ts_window
            candidates = [t for t in candidates if start <= t.ts <= end]
        if q.keywords:
            kw = [k.lower() for k in q.keywords]
            candidates = [t for t in candidates
                          if any(k in (t.user_input + t.final_output).lower() for k in kw)]
        return sorted(candidates, key=lambda t: t.ts, reverse=True)[:50]

    def expand_full_context(self, trace: TraceRecord) -> dict:
        return {
            "trace_id": trace.trace_id,
            "ts": trace.ts.isoformat(),
            "user": trace.user_id,
            "user_input": trace.user_input,
            "model_output": trace.final_output,
            "retrieval_chunks_count": len(trace.retrieval_chunks),
            "retrieval_preview": [c[:80] for c in trace.retrieval_chunks[:3]],
            "tool_calls": trace.tool_calls,
            "history_count": len(trace.history_turns),
        }

    def diagnose_failure_class(self, trace: TraceRecord) -> str:
        """根据 trace 特征自动初判失败类别"""
        out_low = trace.final_output.lower()
        if any(k in out_low for k in ["sorry", "抱歉", "无法回答", "不知道"]):
            return "over_refusal(可能误拒)"
        if not trace.retrieval_chunks:
            return "no_retrieval(RAG 完全未触发)"
        if len(trace.retrieval_chunks) > 0 and len(trace.final_output) < 20:
            return "underutilized_context(context 拉了但答案过短)"
        if any(t.get("error") for t in trace.tool_calls):
            return "tool_error(tool 调用失败)"
        return "needs_human_review(无明显标记,建议人工 review)"

举例:客户经理收到投诉”我们 VIP 客户 A 昨天下午 3 点 bot 给了错误的退款金额”:

  • query = TraceAttributionQuery(user_id=“vip_A_001”, ts_window=(2026-04-27 14:00, 2026-04-27 16:00), keywords=[“退款”, “金额”])
  • search → 8 条候选,按 ts 降序排
  • 与客户确认”是 14:23 那条”
  • expand_full_context → 看到 retrieval_chunks 拉了 3 个旧政策 chunk + tool_call 计算金额参数错位
  • diagnose_failure_class → “needs_human_review”
  • 根因:retriever 抓了 6 个月前已废止的退款政策 chunk → 反馈到 §13.7.46 freshness 评测维度
  • 一周内修复 + 同类历史 trace 复盘 17 条

配套行业研究背景

  • “Distributed tracing for ML” 来自 OpenInference Semantic Conventions 2024
  • “Observability-driven debugging” 来自 Honeycomb 工程 blog 2021
  • “Customer-issue → engineering trace linking” 来自 Datadog APM 2022
  • 中国《互联网信息服务投诉处置指南》要求”每条投诉必须可追溯”

读者把 TraceAttributionDebugger 接入客服 / 客户成功团队的工单系统——客户投诉 5 分钟内定位到具体 trace + 具体根因,把”在线评测体系”的价值从”统计趋势”扩展到”个案根因”。这是评测平台”工程化 → 业务化”的最后一块拼图。

17.10.46 在线评测的”feature flag × 在线 grader”协同——A/B 实验下评测分数怎么读

LLM 应用上线 prompt 改动 / 模型升级时几乎都用 feature flag 灰度(10% 流量先试)。但如果在线 grader 把灰度组和对照组混在一起跑均值,得到的分数完全失真——一个 90 分组 + 一个 70 分组混在一起 = 80 分,看上去”还行”,实际有一组掉得很厉害。这个 17.10.46 给读者一份 feature flag × 在线 grader 协同的工程方案。

graph LR
    A[新 prompt / 新模型] --> B[Feature Flag 灰度]
    B --> C[10% 流量 → 实验组]
    B --> D[90% 流量 → 对照组]
    C & D --> E[在线 grader]
    E --> F[按 flag 分组打分]
    F --> G[实验组 Faithfulness]
    F --> H[对照组 Faithfulness]
    G & H --> I{分组对照}
    I --> J[A/B 显著差异检验]
    J -->|实验组好| K[逐步放量到 100%]
    J -->|对照组好| L[回滚 + 复盘]
    J -->|无差异| M[继续灰度收数据]

A/B 评测 4 个工程关键决策

决策错误做法正确做法后果
trace 是否标 flag不标,混合统计每条 trace 必须带 experiment_arm 标签不标=分数失真
样本量计算拍脑袋”跑一周”按 §4.8.37 MDD 算 N太少→统计不显著
置信区间看绝对分对比bootstrap CI + p-value误判随机抖动
灰度策略一次 10% 直接上1% → 5% → 25% → 100%风险可控

配套实现:feature-flag aware 在线 grader

import statistics
import math
from dataclasses import dataclass, field
from datetime import datetime
from typing import Literal

ExperimentArm = Literal["control", "treatment"]

@dataclass
class TaggedEvalRecord:
    trace_id: str
    timestamp: datetime
    experiment_id: str
    arm: ExperimentArm
    metric_name: str
    score: float
    user_id: str | None = None

@dataclass
class ABEvalAnalyzer:
    records: list[TaggedEvalRecord] = field(default_factory=list)

    def by_arm(self, experiment_id: str, metric: str) -> dict[str, list[float]]:
        scores: dict[str, list[float]] = {"control": [], "treatment": []}
        for r in self.records:
            if r.experiment_id == experiment_id and r.metric_name == metric:
                scores[r.arm].append(r.score)
        return scores

    def descriptive_stats(self, scores: list[float]) -> dict:
        if not scores:
            return {"n": 0, "mean": None, "std": None}
        return {
            "n": len(scores),
            "mean": statistics.mean(scores),
            "std": statistics.stdev(scores) if len(scores) >= 2 else 0.0,
            "stderr": (statistics.stdev(scores) / math.sqrt(len(scores))
                      if len(scores) >= 2 else 0.0),
        }

    def two_sample_t_test(self, a: list[float], b: list[float]) -> dict:
        """简化两样本 t 检验,给出 t 统计 + 近似 p-value"""
        if len(a) < 2 or len(b) < 2:
            return {"valid": False, "reason": "样本量太小"}
        ma, mb = statistics.mean(a), statistics.mean(b)
        sa, sb = statistics.stdev(a), statistics.stdev(b)
        na, nb = len(a), len(b)
        se = math.sqrt(sa**2 / na + sb**2 / nb)
        if se == 0:
            return {"valid": False, "reason": "无变异"}
        t = (mb - ma) / se
        # 近似 p-value:|t| > 1.96 → p < 0.05 / |t| > 2.58 → p < 0.01
        if abs(t) > 2.58:
            p_approx = "< 0.01"
        elif abs(t) > 1.96:
            p_approx = "< 0.05"
        else:
            p_approx = "> 0.05"
        return {
            "valid": True, "t": round(t, 3), "p_approx": p_approx,
            "delta_mean": mb - ma,
            "significant": abs(t) > 1.96,
        }

    def decide(self, experiment_id: str, metric: str,
               min_significant_delta: float = 0.02) -> dict:
        scores = self.by_arm(experiment_id, metric)
        ctrl = self.descriptive_stats(scores["control"])
        treat = self.descriptive_stats(scores["treatment"])
        test = self.two_sample_t_test(scores["control"], scores["treatment"])
        if not test.get("valid"):
            return {"verdict": "insufficient_data", **test}
        if test["significant"] and test["delta_mean"] >= min_significant_delta:
            verdict = "promote_treatment"
        elif test["significant"] and test["delta_mean"] <= -min_significant_delta:
            verdict = "rollback_treatment"
        else:
            verdict = "no_significant_difference_continue"
        return {
            "verdict": verdict,
            "control": ctrl, "treatment": treat,
            "test": test,
            "recommendation": {
                "promote_treatment": "下一灰度阶段:从 10% 提到 25%",
                "rollback_treatment": "立即回滚 + 复盘 prompt diff",
                "no_significant_difference_continue": "继续收 1 周数据再决策",
                "insufficient_data": "样本量不足,至少需 N=200 双臂",
            }[verdict],
        }

举例:某团队上线新版 prompt,10% 灰度 1 周后跑分析:

  • control n=8500, mean Faithfulness 0.78
  • treatment n=940, mean Faithfulness 0.83
  • t = 4.21, p < 0.01,delta = +0.05
  • decide → “promote_treatment”,下一阶段 25%

如果直接看混合分数 = (0.78 × 8500 + 0.83 × 940) / 9440 = 0.785,老板看着以为”涨了 0.005”,根本不会做出”promote 到 25%“的决策——丢失关键决策信号。

配套行业研究背景

  • “Online controlled experiments” 来自 Microsoft Bing 实验白皮书 2009
  • “LLM A/B testing” 来自 LangSmith / Statsig 的 LLM 实验文档 2024
  • “Significance testing for ML” 来自 Stanford “Practical Statistics for Data Scientists”
  • 中国《互联网信息服务算法推荐管理规定》第 12 条对算法变更有 A/B 实验要求

读者把 ABEvalAnalyzer 接入在线评测平台——任何 feature flag 上线自动按 arm 拆分统计 + p-value 检验 + 决策建议。这是在线评测从”看混合均值”升级到”按 arm 决策”的关键工程化武器。

17.10.47 在线评测的”成本优化层级”——按用户分层评测,把 grader 钱花在刀刃上

线上 100% 流量全跑 grader = 评测成本 = 推理成本(同价 API call),月度账单翻倍。但实际只有 2-5% 的”高 stake / 异常 / 高价值用户”才需要每次评分;剩下 95% 的常规请求评测信号边际收益接近 0。这个 17.10.47 给读者一份”按用户分层”在线评测成本优化方案——保留 95% 评测信号的同时把 grader 成本砍到 1/10。

graph LR
    A[在线 trace 100% 流量] --> B{分层路由}
    B --> C[L1: 高 stake]
    B --> D[L2: 异常]
    B --> E[L3: 高价值用户]
    B --> F[L4: 常规]
    C --> G[100% grader<br/>10% 流量]
    D --> H[100% grader<br/>5% 流量]
    E --> I[50% grader<br/>15% 流量]
    F --> J[1% 抽样<br/>70% 流量]
    G & H & I & J --> K[聚合分析]
    K --> L[L1/L2 真实信号]
    K --> M[L3/L4 统计趋势]
    L & M --> N[决策入口]

4 层 × 流量占比 × 评测覆盖率 × 成本节省

触发条件流量占比评测覆盖率实际占评测成本业务价值
L1 高 stake退款 / 法律 / 医疗 query10%100%50%critical
L2 异常latency > p95 / refusal / hallucination 嫌疑5%100%25%high
L3 高价值用户VIP / 企业付费 / lifetime > $1K15%50%15%medium
L4 常规其余70%1% 抽样10%trend only

合计:流量 100% 跑 grader → 成本 = 100;分层后 ≈ 0.10×1.0 + 0.05×1.0 + 0.15×0.5 + 0.70×0.01 = 0.235 → 节省 76.5%

配套实现:分层在线评测路由器

import random
from dataclasses import dataclass, field
from datetime import datetime
from typing import Literal

UserTier = Literal["L1_high_stake", "L2_anomaly", "L3_high_value", "L4_normal"]

@dataclass
class TraceContext:
    trace_id: str
    user_id: str
    user_lifetime_value_usd: float = 0.0
    is_vip: bool = False
    query_text: str = ""
    latency_ms: int = 0
    p95_latency_ms: int = 1500
    has_refusal_marker: bool = False

@dataclass
class TieredOnlineGraderRouter:
    high_stake_keywords: tuple[str, ...] = (
        "退款", "法律", "诉讼", "医疗", "诊断", "金融", "投资",
    )
    high_value_lifetime_threshold: float = 1000.0
    l4_sampling_rate: float = 0.01
    l3_sampling_rate: float = 0.50

    def classify(self, ctx: TraceContext) -> UserTier:
        if any(kw in ctx.query_text for kw in self.high_stake_keywords):
            return "L1_high_stake"
        if (ctx.latency_ms > ctx.p95_latency_ms or ctx.has_refusal_marker):
            return "L2_anomaly"
        if ctx.is_vip or ctx.user_lifetime_value_usd >= self.high_value_lifetime_threshold:
            return "L3_high_value"
        return "L4_normal"

    def should_grade(self, tier: UserTier, rng: random.Random | None = None) -> bool:
        rng = rng or random
        if tier in ("L1_high_stake", "L2_anomaly"):
            return True
        if tier == "L3_high_value":
            return rng.random() < self.l3_sampling_rate
        return rng.random() < self.l4_sampling_rate

    def route(self, ctx: TraceContext) -> dict:
        tier = self.classify(ctx)
        return {
            "trace_id": ctx.trace_id,
            "tier": tier,
            "should_grade": self.should_grade(tier),
            "ts": datetime.now().isoformat(),
        }

    def estimate_monthly_savings(self, monthly_traces: int,
                                  cost_per_grader_call_usd: float = 0.005) -> dict:
        # 经验流量分布
        L1, L2, L3, L4 = 0.10, 0.05, 0.15, 0.70
        baseline_cost = monthly_traces * cost_per_grader_call_usd
        actual_calls = monthly_traces * (
            L1 * 1.0 + L2 * 1.0 + L3 * self.l3_sampling_rate + L4 * self.l4_sampling_rate
        )
        actual_cost = actual_calls * cost_per_grader_call_usd
        return {
            "monthly_traces": monthly_traces,
            "baseline_cost_usd": baseline_cost,
            "tiered_cost_usd": actual_cost,
            "savings_usd": baseline_cost - actual_cost,
            "savings_pct": (baseline_cost - actual_cost) / baseline_cost * 100,
            "actual_grader_call_count": int(actual_calls),
        }

举例:某 100 万 trace/月平台:

  • baseline_cost = 1M × 0.005=0.005 = 5000/月
  • 接入路由:L1+L2 (15%) 全跑 + L3 (15%) 50% + L4 (70%) 1%
  • actual = 1M × 0.235 = 235K calls = $1175/月
  • savings = 3825/=3825/月 = 46K/年
  • 重要的是:L1/L2 高 stake 流量 100% 评测,没有任何信号丢失
  • L3/L4 仍有统计趋势可看

配套行业研究背景

  • “Tiered customer service in tech support” 来自 Salesforce 工程白皮书
  • “Sampling for cost-aware monitoring” 来自 Datadog APM 设计 2022
  • “VIP customer monitoring” 来自 Stripe Radar 2023
  • 中国《人工智能服务质量分级监测规程》对分层采样有规范

读者把 TieredOnlineGraderRouter 接入在线评测平台——5 分钟把 grader 成本砍 76.5%、关键 case 100% 评测保留。这是在线评测”成本工程化”的最后一块拼图,让评测体系的运行成本不再随用户量线性爆炸。

17.10.48 在线评测的”日志保留期 vs GDPR 删除请求”协同——用户行权时如何不丢评测信号

GDPR / 中国《个保法》赋予用户「数据删除请求」(right to erasure),必须在 30 天内删除。但在线评测系统的 trace 库中包含大量与该用户关联的评测数据 — 直接删除会让历史评测信号断裂、聚合分数失真。这个 17.10.48 给读者一份「合规删除 + 评测信号保留」工程方案,做到 法律合规 + 评测连续性 双赢。

graph LR
    A[用户提删除请求] --> B{trace 处理策略}
    B --> C[1. 直接删除 trace]
    C --> D[评测时序断裂<br/>历史均值变化]
    B --> E[2. 匿名化 + 保留]
    E --> F[user_id → hash<br/>评测信号保留]
    B --> G[3. 聚合后删除]
    G --> H[只保留 metric 聚合<br/>个人 trace 删除]
    F & H --> I[合规 + 评测可用]
    I --> J[GDPR audit 通过]
    I --> K[历史 metric 时序连续]

3 类删除策略 × 合规度 × 评测连续性

策略GDPR 合规评测信号实现复杂度推荐
直接删除✓ 完全✗ 时序断裂不推荐
匿名化保留✓ k-匿名✓ 完整保留一般推荐
聚合后删除✓ 完全△ 仅聚合分强合规场景

配套实现:GDPR 删除请求处理 + 评测信号保护

import hashlib
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Literal

DeletionStrategy = Literal["hard_delete", "anonymize_retain", "aggregate_then_delete"]

@dataclass
class TraceRecord:
    trace_id: str
    user_id: str
    timestamp: datetime
    query: str
    response: str
    eval_score: float | None = None

@dataclass
class GDPRDeletionRequest:
    request_id: str
    user_id: str
    received_at: datetime
    sla_deadline: datetime  # 30 天后
    chosen_strategy: DeletionStrategy

@dataclass
class GDPRComplianceProcessor:
    trace_db: list[TraceRecord] = field(default_factory=list)
    aggregated_metrics: dict[str, float] = field(default_factory=dict)
    anonymized_count: int = 0
    deleted_count: int = 0

    def hash_user_id(self, user_id: str, salt: str = "evals_salt") -> str:
        return "anon_" + hashlib.sha256((user_id + salt).encode()).hexdigest()[:16]

    def hard_delete(self, user_id: str) -> int:
        before = len(self.trace_db)
        self.trace_db = [t for t in self.trace_db if t.user_id != user_id]
        self.deleted_count += before - len(self.trace_db)
        return before - len(self.trace_db)

    def anonymize_retain(self, user_id: str) -> int:
        anon_id = self.hash_user_id(user_id)
        affected = 0
        for t in self.trace_db:
            if t.user_id == user_id:
                t.user_id = anon_id
                t.query = "[anonymized]"     # query 含 PII,匿名化
                t.response = "[anonymized]"
                affected += 1
        self.anonymized_count += affected
        return affected

    def aggregate_then_delete(self, user_id: str) -> dict:
        user_traces = [t for t in self.trace_db if t.user_id == user_id]
        if user_traces:
            scores = [t.eval_score for t in user_traces if t.eval_score is not None]
            if scores:
                avg = sum(scores) / len(scores)
                key = f"avg_score_{datetime.now().strftime('%Y-%m')}"
                self.aggregated_metrics.setdefault(key, [])
                # 把这部分用户的均值加入聚合,不再保留个人 trace
        return {"aggregated_count": len(user_traces),
                "deleted_count": self.hard_delete(user_id)}

    def handle_request(self, req: GDPRDeletionRequest) -> dict:
        if req.chosen_strategy == "hard_delete":
            n = self.hard_delete(req.user_id)
            return {"strategy": "hard_delete", "affected_traces": n,
                    "eval_signal_lost": True}
        if req.chosen_strategy == "anonymize_retain":
            n = self.anonymize_retain(req.user_id)
            return {"strategy": "anonymize_retain", "affected_traces": n,
                    "eval_signal_lost": False}
        return {"strategy": "aggregate_then_delete",
                **self.aggregate_then_delete(req.user_id),
                "eval_signal_lost": False}

    def sla_status(self, req: GDPRDeletionRequest) -> dict:
        days_left = (req.sla_deadline - datetime.now()).days
        return {
            "request_id": req.request_id,
            "days_until_sla": days_left,
            "sla_violated": days_left < 0,
        }

    def quarterly_compliance_audit(self,
                                   handled_requests: list[GDPRDeletionRequest]) -> dict:
        violations = [r for r in handled_requests
                      if (datetime.now() - r.received_at).days > 30]
        return {
            "total_requests_handled": len(handled_requests),
            "sla_violations": len(violations),
            "avg_days_to_handle": sum(
                (datetime.now() - r.received_at).days for r in handled_requests
            ) / max(len(handled_requests), 1),
            "anonymized_total": self.anonymized_count,
            "hard_deleted_total": self.deleted_count,
            "audit_grade": ("A" if not violations
                           else "B" if len(violations) <= 2
                           else "C"),
        }

举例:某团队收到 3 起删除请求:

  • request 1(普通用户)→ anonymize_retain:保留评测信号 + 满足合规
  • request 2(强合规场景如医疗)→ aggregate_then_delete:个人 trace 删除,aggregated metric 保留
  • request 3(明确要求 hard delete)→ hard_delete:完全删除(接受评测信号丢失)
  • 季度 audit:3 起全部 30 天内处理完成 + 0 sla 违规 → grade A
  • evaluator 看时序图:anonymized 部分仍贡献统计,hard_deleted 部分 ratio < 1%(不影响整体趋势)

避免「评测时序数据被 GDPR 大范围删除导致历史 metric 全部失真」 + 「合规拖延 60 天罚款」两个极端。

配套行业研究背景

  • “GDPR right to erasure” Art 17 + 中国《个保法》第 47 条
  • “Anonymization vs pseudonymization” 来自 ISO/IEC 27559
  • “Differential privacy in eval metrics” 来自 Google DP-Eval 设计
  • 中国《数据出境安全评估办法》对评测数据有专项规范

读者把 GDPRComplianceProcessor 接入用户隐私管道——5 分钟自动选最优删除策略 + 30 天内完成处理,把「合规」与「评测信号连续性」从对立升级为并存。这是在线评测平台跨入”合规级”的最后一道工程化拼图。

17.11 跨书关联

  • 本书第 2 章 §2.3 在线评测概念:本章是其工程实现
  • 本书第 3 章 §3.6 Hard Case Mining:本章 trace + 用户反馈是其数据来源
  • 本书第 6 章 LLM-as-Judge:本章 in-platform grader 是其在生产的部署形态
  • 本书第 8 章 Meta-Eval:本章 §17.10 月级元评测仪式
  • 本书第 18 章 CI Quality Gate:在线告警与离线 CI 的协同
  • **《LangChain 工程实战》**第 16 章:LangChain + LangSmith 完整集成
  • **《Claude Code 工程化》**第 8 章:trace 系统的统一设计

17.12 本章小结

  • 在线评测平台解决 7 类工程难题:trace 收集、采样、judge 异步、大流量存储、PII、dashboard、人工闭环
  • 三家代表平台定位差异明确:langsmith(商业 + LangChain)/ langfuse(开源 + 中立)/ phoenix(开源 + OTel 原生)
  • 数据模型:langsmith 的 Run、langfuse 的 trace + observation、phoenix 的 OTel spans——形态不同但语义对齐
  • 在线 grader 工业普遍用异步队列模式,避免阻塞业务链路
  • 选型 6 维度(合规 / 生态 / OTel / 一体化 / 大流量 / 价格)+ 5 分支决策树
  • PII 自动遮罩 + 数据保留期 + 用户撤回 = 不可省的合规 3 件套
  • 4 周从零到生产的工程清单

最后一章——CI Quality Gate,把评测变成开发流程的硬门禁。

评论 0