ai-tracing-requests
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseSee What Your AI Did on a Specific Request
查看你的AI针对特定请求执行的操作
Guide the user through tracing and debugging individual AI requests. The goal: for any request, see every LM call, retrieval step, intermediate result, token count, and latency.
引导用户完成单个AI请求的追踪与调试。目标:针对任意请求,查看所有LM调用、检索步骤、中间结果、Token计数以及延迟数据。
When you need this
适用场景
- A customer reports a wrong answer — you need to see exactly what happened
- Your pipeline is slow — you need to find which step is the bottleneck
- Compliance requires audit trails of every AI decision
- QA wants to inspect AI behavior before launch
- You're debugging why an agent took unexpected actions
- 客户反馈收到错误回答——你需要精准查看整个过程
- 你的AI流水线运行缓慢——你需要定位瓶颈步骤
- 合规要求记录每一个AI决策的审计追踪
- QA团队需要在上线前检查AI行为
- 你需要调试Agent为何执行了意外操作
How it's different from monitoring
与监控的区别
Monitoring ( | Tracing (this skill) | |
|---|---|---|
| Scope | Aggregate health across all requests | Single request, full detail |
| Question answered | "Is accuracy dropping this week?" | "Why did customer #12345 get a wrong answer at 2:14pm?" |
| Output | Scores, trends, alerts | Call traces, intermediate results, latencies |
| Timing | Periodic batch evaluation | Per-request, real-time |
监控( | 追踪(本技能) | |
|---|---|---|
| 范围 | 所有请求的整体健康状况 | 单个请求的完整细节 |
| 可解答的问题 | “本周准确率是否下降?” | “为什么客户#12345在下午2:14收到了错误回答?” |
| 输出内容 | 分数、趋势、告警 | 调用追踪记录、中间结果、延迟数据 |
| 时机 | 周期性批量评估 | 单请求级、实时 |
Step 1: Understand the need
步骤1:明确需求
Quick decision tree:
What are you debugging?
|
+- A specific wrong answer right now?
| -> Step 2: Quick debugging with dspy.inspect_history
|
+- Need to trace requests in a running app?
| -> Step 3-4: Add per-step tracing
|
+- Need a visual trace viewer for your team?
| -> Step 5: Connect Langtrace, Phoenix, or Jaeger
|
+- Need to find patterns across many traces?
-> Step 6: Search and filter traces快速决策树:
What are you debugging?
|
+- A specific wrong answer right now?
| -> Step 2: Quick debugging with dspy.inspect_history
|
+- Need to trace requests in a running app?
| -> Step 3-4: Add per-step tracing
|
+- Need a visual trace viewer for your team?
| -> Step 5: Connect Langtrace, Phoenix, or Jaeger
|
+- Need to find patterns across many traces?
-> Step 6: Search and filter tracesStep 2: Quick debugging (no extra tools needed)
步骤2:快速调试(无需额外工具)
Inspect the last LM calls
查看最近的LM调用
The fastest way to see what happened:
python
import dspy查看过程的最快方式:
python
import dspyRun your program
Run your program
result = my_program(question="What is our refund policy?")
result = my_program(question="What is our refund policy?")
See the last 5 LM calls — shows full prompts and responses
See the last 5 LM calls — shows full prompts and responses
dspy.inspect_history(n=5)
This shows:
- The full prompt sent to the LM (including system message, few-shot examples, input)
- The LM's raw response
- How DSPy parsed the response into fieldsdspy.inspect_history(n=5)
该方法会展示:
- 发送给LM的完整提示词(包括系统消息、少样本示例、输入内容)
- LM的原始响应
- DSPy如何将响应解析为字段Time individual steps
单独统计各步骤耗时
python
import time
result = my_program(question="test")python
import time
result = my_program(question="test")Quick manual timing
Quick manual timing
start = time.time()
step1_result = my_program.step1(question="test")
step1_time = time.time() - start
print(f"Step 1: {step1_time:.2f}s")
start = time.time()
step2_result = my_program.step2(context=step1_result.context, question="test")
step2_time = time.time() - start
print(f"Step 2: {step2_time:.2f}s")
undefinedstart = time.time()
step1_result = my_program.step1(question="test")
step1_time = time.time() - start
print(f"Step 1: {step1_time:.2f}s")
start = time.time()
step2_result = my_program.step2(context=step1_result.context, question="test")
step2_time = time.time() - start
print(f"Step 2: {step2_time:.2f}s")
undefinedJSONL trace logging
JSONL追踪日志
For persistent traces without any extra dependencies:
python
import json
import time
from datetime import datetime
class TracedProgram(dspy.Module):
"""Wraps any DSPy program to log per-step traces to JSONL."""
def __init__(self, program, log_path="traces.jsonl"):
self.program = program
self.log_path = log_path
def forward(self, **kwargs):
trace_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
steps = []
start = time.time()
result = self.program(**kwargs)
total_time = time.time() - start
# Log the trace
entry = {
"trace_id": trace_id,
"timestamp": datetime.now().isoformat(),
"inputs": {k: str(v) for k, v in kwargs.items()},
"outputs": {k: str(getattr(result, k, "")) for k in result.keys()},
"total_latency_ms": round(total_time * 1000),
}
with open(self.log_path, "a") as f:
f.write(json.dumps(entry) + "\n")
return result无需任何额外依赖即可生成持久化追踪记录:
python
import json
import time
from datetime import datetime
class TracedProgram(dspy.Module):
"""Wraps any DSPy program to log per-step traces to JSONL."""
def __init__(self, program, log_path="traces.jsonl"):
self.program = program
self.log_path = log_path
def forward(self, **kwargs):
trace_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
steps = []
start = time.time()
result = self.program(**kwargs)
total_time = time.time() - start
# Log the trace
entry = {
"trace_id": trace_id,
"timestamp": datetime.now().isoformat(),
"inputs": {k: str(v) for k, v in kwargs.items()},
"outputs": {k: str(getattr(result, k, "")) for k in result.keys()},
"total_latency_ms": round(total_time * 1000),
}
with open(self.log_path, "a") as f:
f.write(json.dumps(entry) + "\n")
return resultUse it
Use it
traced = TracedProgram(my_program)
result = traced(question="How do refunds work?")
undefinedtraced = TracedProgram(my_program)
result = traced(question="How do refunds work?")
undefinedStep 3: Per-step tracing in pipelines
步骤3:流水线中的分步追踪
For multi-step pipelines, trace each stage separately to see exactly where things go wrong:
python
import json
import time
import uuid
from datetime import datetime
class StepTracer:
"""Collects per-step timing and intermediate results."""
def __init__(self):
self.steps = []
self.trace_id = str(uuid.uuid4())[:8]
def trace_step(self, name, func, **kwargs):
"""Run a step and record its inputs, outputs, and latency."""
start = time.time()
result = func(**kwargs)
latency = time.time() - start
self.steps.append({
"step": name,
"inputs": {k: str(v)[:200] for k, v in kwargs.items()},
"outputs": {k: str(getattr(result, k, ""))[:200] for k in result.keys()},
"latency_ms": round(latency * 1000),
})
return result
def summary(self):
"""Print a summary of all traced steps."""
print(f"Trace {self.trace_id}:")
total = sum(s["latency_ms"] for s in self.steps)
for step in self.steps:
pct = step["latency_ms"] / total * 100 if total > 0 else 0
print(f" {step['step']}: {step['latency_ms']}ms ({pct:.0f}%)")
print(f" Total: {total}ms")
def to_dict(self):
return {
"trace_id": self.trace_id,
"timestamp": datetime.now().isoformat(),
"steps": self.steps,
"total_latency_ms": sum(s["latency_ms"] for s in self.steps),
}对于多步骤流水线,单独追踪每个阶段以精准定位问题:
python
import json
import time
import uuid
from datetime import datetime
class StepTracer:
"""Collects per-step timing and intermediate results."""
def __init__(self):
self.steps = []
self.trace_id = str(uuid.uuid4())[:8]
def trace_step(self, name, func, **kwargs):
"""Run a step and record its inputs, outputs, and latency."""
start = time.time()
result = func(**kwargs)
latency = time.time() - start
self.steps.append({
"step": name,
"inputs": {k: str(v)[:200] for k, v in kwargs.items()},
"outputs": {k: str(getattr(result, k, ""))[:200] for k in result.keys()},
"latency_ms": round(latency * 1000),
})
return result
def summary(self):
"""Print a summary of all traced steps."""
print(f"Trace {self.trace_id}:")
total = sum(s["latency_ms"] for s in self.steps)
for step in self.steps:
pct = step["latency_ms"] / total * 100 if total > 0 else 0
print(f" {step['step']}: {step['latency_ms']}ms ({pct:.0f}%)")
print(f" Total: {total}ms")
def to_dict(self):
return {
"trace_id": self.trace_id,
"timestamp": datetime.now().isoformat(),
"steps": self.steps,
"total_latency_ms": sum(s["latency_ms"] for s in self.steps),
}Use in a pipeline
Use in a pipeline
class TracedRAG(dspy.Module):
def init(self):
self.retrieve = dspy.Retrieve(k=3)
self.answer = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
tracer = StepTracer()
retrieval = tracer.trace_step("retrieve", self.retrieve, query=question)
answer = tracer.trace_step(
"answer", self.answer,
context=retrieval.passages, question=question,
)
tracer.summary()
# Trace a1b2c3d4:
# retrieve: 120ms (15%)
# answer: 680ms (85%)
# Total: 800ms
return answerundefinedclass TracedRAG(dspy.Module):
def init(self):
self.retrieve = dspy.Retrieve(k=3)
self.answer = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
tracer = StepTracer()
retrieval = tracer.trace_step("retrieve", self.retrieve, query=question)
answer = tracer.trace_step(
"answer", self.answer,
context=retrieval.passages, question=question,
)
tracer.summary()
# Trace a1b2c3d4:
# retrieve: 120ms (15%)
# answer: 680ms (85%)
# Total: 800ms
return answerundefinedSave traces for later analysis
保存追踪记录供后续分析
python
def save_trace(tracer, path="traces.jsonl"):
with open(path, "a") as f:
f.write(json.dumps(tracer.to_dict()) + "\n")python
def save_trace(tracer, path="traces.jsonl"):
with open(path, "a") as f:
f.write(json.dumps(tracer.to_dict()) + "\n")Load and analyze traces
Load and analyze traces
def load_traces(path="traces.jsonl"):
with open(path) as f:
return [json.loads(line) for line in f]
def find_slow_traces(traces, threshold_ms=2000):
return [t for t in traces if t["total_latency_ms"] > threshold_ms]
def find_failed_steps(traces):
return [
t for t in traces
if any("error" in str(s.get("outputs", "")).lower() for s in t["steps"])
]
undefineddef load_traces(path="traces.jsonl"):
with open(path) as f:
return [json.loads(line) for line in f]
def find_slow_traces(traces, threshold_ms=2000):
return [t for t in traces if t["total_latency_ms"] > threshold_ms]
def find_failed_steps(traces):
return [
t for t in traces
if any("error" in str(s.get("outputs", "")).lower() for s in t["steps"])
]
undefinedStep 4: OpenTelemetry instrumentation
步骤4:OpenTelemetry埋点
For production tracing with any backend (Jaeger, Zipkin, Datadog, etc.):
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor适用于生产环境的追踪,可对接任意后端(Jaeger、Zipkin、Datadog等):
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessorSetup — do this once at app startup
Setup — do this once at app startup
provider = TracerProvider()
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("my-ai-app")
class OTelTracedProgram(dspy.Module):
"""Wraps a DSPy program with OpenTelemetry spans."""
def init(self, program):
self.program = program
def forward(self, **kwargs):
with tracer.start_as_current_span("ai_request") as span:
span.set_attribute("ai.inputs", json.dumps({k: str(v) for k, v in kwargs.items()}))
start = time.time()
result = self.program(**kwargs)
latency = time.time() - start
span.set_attribute("ai.latency_ms", round(latency * 1000))
span.set_attribute("ai.outputs", json.dumps(
{k: str(getattr(result, k, "")) for k in result.keys()}
))
return resultundefinedprovider = TracerProvider()
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("my-ai-app")
class OTelTracedProgram(dspy.Module):
"""Wraps a DSPy program with OpenTelemetry spans."""
def init(self, program):
self.program = program
def forward(self, **kwargs):
with tracer.start_as_current_span("ai_request") as span:
span.set_attribute("ai.inputs", json.dumps({k: str(v) for k, v in kwargs.items()}))
start = time.time()
result = self.program(**kwargs)
latency = time.time() - start
span.set_attribute("ai.latency_ms", round(latency * 1000))
span.set_attribute("ai.outputs", json.dumps(
{k: str(getattr(result, k, "")) for k in result.keys()}
))
return resultundefinedTrace individual pipeline steps with OTel
使用OTel追踪流水线的单个步骤
python
class OTelTracedRAG(dspy.Module):
def __init__(self):
self.retrieve = dspy.Retrieve(k=3)
self.answer = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
with tracer.start_as_current_span("rag_pipeline") as parent:
parent.set_attribute("question", question)
with tracer.start_as_current_span("retrieve"):
retrieval = self.retrieve(query=question)
with tracer.start_as_current_span("generate_answer"):
answer = self.answer(
context=retrieval.passages, question=question
)
return answerpython
class OTelTracedRAG(dspy.Module):
def __init__(self):
self.retrieve = dspy.Retrieve(k=3)
self.answer = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
with tracer.start_as_current_span("rag_pipeline") as parent:
parent.set_attribute("question", question)
with tracer.start_as_current_span("retrieve"):
retrieval = self.retrieve(query=question)
with tracer.start_as_current_span("generate_answer"):
answer = self.answer(
context=retrieval.passages, question=question
)
return answerStep 5: Connect a trace viewer
步骤5:连接追踪查看器
Option A: Langtrace (best DSPy integration)
选项A:Langtrace(最佳DSPy集成)
First-class DSPy auto-instrumentation — one line to trace all LM calls:
bash
pip install langtrace-python-sdkpython
from langtrace_python_sdk import langtrace
langtrace.init(api_key="your-key") # or use LANGTRACE_API_KEY env var原生DSPy自动埋点——一行代码即可追踪所有LM调用:
bash
pip install langtrace-python-sdkpython
from langtrace_python_sdk import langtrace
langtrace.init(api_key="your-key") # or use LANGTRACE_API_KEY env varThat's it — all DSPy calls are now traced automatically
That's it — all DSPy calls are now traced automatically
result = my_program(question="test")
result = my_program(question="test")
View traces at app.langtrace.ai
View traces at app.langtrace.ai
undefinedundefinedOption B: Arize Phoenix (open-source, self-hosted)
选项B:Arize Phoenix(开源、自托管)
bash
pip install arize-phoenix openinference-instrumentation-dspypython
import phoenix as px
from openinference.instrumentation.dspy import DSPyInstrumentorbash
pip install arize-phoenix openinference-instrumentation-dspypython
import phoenix as px
from openinference.instrumentation.dspy import DSPyInstrumentorLaunch local trace viewer
Launch local trace viewer
px.launch_app() # Opens at http://localhost:6006
px.launch_app() # Opens at http://localhost:6006
Auto-instrument DSPy
Auto-instrument DSPy
DSPyInstrumentor().instrument()
DSPyInstrumentor().instrument()
All DSPy calls are now traced
All DSPy calls are now traced
result = my_program(question="test")
undefinedresult = my_program(question="test")
undefinedOption C: Jaeger (open-source, Docker)
选项C:Jaeger(开源、Docker部署)
bash
docker run -d -p 16686:16686 -p 4317:4317 jaegertracing/all-in-one:latestpython
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporterbash
docker run -d -p 16686:16686 -p 4317:4317 jaegertracing/all-in-one:latestpython
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporterExport spans to Jaeger
Export spans to Jaeger
exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
provider.add_span_processor(BatchSpanProcessor(exporter))
exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
provider.add_span_processor(BatchSpanProcessor(exporter))
View traces at http://localhost:16686
View traces at http://localhost:16686
undefinedundefinedComparison
对比
| Feature | Langtrace | Arize Phoenix | Jaeger |
|---|---|---|---|
| DSPy auto-instrumentation | Yes (built-in) | Yes (plugin) | Manual |
| Setup effort | One line | Two lines + Docker | Docker + manual spans |
| Self-hosted option | Yes | Yes | Yes |
| Cloud option | Yes | Yes | No |
| LM call details | Prompts, tokens, cost | Prompts, tokens | Custom attributes |
| Best for | DSPy-first teams | Teams wanting open-source + UI | Teams already using Jaeger |
| 特性 | Langtrace | Arize Phoenix | Jaeger |
|---|---|---|---|
| DSPy自动埋点 | 是(内置) | 是(插件) | 手动 |
| 搭建难度 | 一行代码 | 两行代码+Docker | Docker+手动埋点 |
| 自托管选项 | 是 | 是 | 是 |
| 云端选项 | 是 | 是 | 否 |
| LM调用详情 | 提示词、Token、成本 | 提示词、Token | 自定义属性 |
| 最佳适用场景 | 以DSPy为核心的团队 | 偏好开源+可视化界面的团队 | 已使用Jaeger的团队 |
Step 6: Search and filter traces
步骤6:搜索与筛选追踪记录
Find traces by criteria
按条件查找追踪记录
python
def search_traces(traces, **filters):
"""Search traces by user, time range, latency, or content."""
results = traces
if "min_latency_ms" in filters:
results = [t for t in results if t["total_latency_ms"] >= filters["min_latency_ms"]]
if "after" in filters:
results = [t for t in results if t["timestamp"] >= filters["after"]]
if "before" in filters:
results = [t for t in results if t["timestamp"] <= filters["before"]]
if "contains" in filters:
keyword = filters["contains"].lower()
results = [
t for t in results
if keyword in json.dumps(t).lower()
]
return resultspython
def search_traces(traces, **filters):
"""Search traces by user, time range, latency, or content."""
results = traces
if "min_latency_ms" in filters:
results = [t for t in results if t["total_latency_ms"] >= filters["min_latency_ms"]]
if "after" in filters:
results = [t for t in results if t["timestamp"] >= filters["after"]]
if "before" in filters:
results = [t for t in results if t["timestamp"] <= filters["before"]]
if "contains" in filters:
keyword = filters["contains"].lower()
results = [
t for t in results
if keyword in json.dumps(t).lower()
]
return resultsFind slow requests from today
Find slow requests from today
slow = search_traces(
load_traces(),
min_latency_ms=3000,
after="2025-01-15T00:00:00",
)
undefinedslow = search_traces(
load_traces(),
min_latency_ms=3000,
after="2025-01-15T00:00:00",
)
undefinedAggregate trace statistics
汇总追踪记录统计数据
python
def trace_stats(traces):
"""Summary statistics across traces."""
latencies = [t["total_latency_ms"] for t in traces]
if not latencies:
return "No traces found"
latencies.sort()
return {
"count": len(latencies),
"p50_ms": latencies[len(latencies) // 2],
"p95_ms": latencies[int(len(latencies) * 0.95)],
"p99_ms": latencies[int(len(latencies) * 0.99)],
"max_ms": latencies[-1],
}python
def trace_stats(traces):
"""Summary statistics across traces."""
latencies = [t["total_latency_ms"] for t in traces]
if not latencies:
return "No traces found"
latencies.sort()
return {
"count": len(latencies),
"p50_ms": latencies[len(latencies) // 2],
"p95_ms": latencies[int(len(latencies) * 0.95)],
"p99_ms": latencies[int(len(latencies) * 0.99)],
"max_ms": latencies[-1],
}Step 7: Use traces to improve your AI
步骤7:利用追踪记录优化AI
Traces aren't just for debugging — they're a source of improvement.
追踪记录不仅用于调试——还是AI优化的数据源。
Find patterns in wrong answers
定位错误回答的模式
python
undefinedpython
undefinedLoad traces where the answer was marked wrong by a user or metric
Load traces where the answer was marked wrong by a user or metric
wrong_traces = search_traces(load_traces(), contains='"is_correct": false')
wrong_traces = search_traces(load_traces(), contains='"is_correct": false')
Check which step is most often the bottleneck
Check which step is most often the bottleneck
from collections import Counter
slow_steps = Counter()
for t in wrong_traces:
slowest = max(t["steps"], key=lambda s: s["latency_ms"])
slow_steps[slowest["step"]] += 1
print(slow_steps)
from collections import Counter
slow_steps = Counter()
for t in wrong_traces:
slowest = max(t["steps"], key=lambda s: s["latency_ms"])
slow_steps[slowest["step"]] += 1
print(slow_steps)
Counter({"retrieve": 23, "answer": 7})
Counter({"retrieve": 23, "answer": 7})
-> Retrieval is the problem, not the answer generation
-> Retrieval is the problem, not the answer generation
undefinedundefinedBuild training data from failures
从失败案例中构建训练数据
python
undefinedpython
undefinedExtract failed examples for re-optimization
Extract failed examples for re-optimization
failed_examples = []
for t in wrong_traces:
ex = dspy.Example(
question=t.get("inputs", {}).get("question", ""),
).with_inputs("question")
failed_examples.append(ex)
failed_examples = []
for t in wrong_traces:
ex = dspy.Example(
question=t.get("inputs", {}).get("question", ""),
).with_inputs("question")
failed_examples.append(ex)
Add to training set and re-optimize
Add to training set and re-optimize
See /ai-improving-accuracy
See /ai-improving-accuracy
undefinedundefinedKey patterns
核心模式
- Start with — it's free and solves most debugging needs
dspy.inspect_history - Add JSONL tracing before you need it — you can't debug traces you didn't log
- Trace at the step level, not just the request level — per-step latency reveals bottlenecks
- Use OpenTelemetry for production — it's the standard, works with any backend
- Langtrace is easiest for DSPy — one-line setup with automatic instrumentation
- Traces feed optimization — patterns in wrong answers tell you what to fix
- 从开始——无需额外成本,可解决大多数调试需求
dspy.inspect_history - 提前添加JSONL追踪——没有记录的追踪记录无法用于调试
- 按步骤追踪,而非仅按请求追踪——分步延迟数据可揭示瓶颈
- 生产环境使用OpenTelemetry——行业标准,可对接任意后端
- Langtrace是DSPy的最佳选择——一行代码搭建,自动埋点
- 追踪记录助力优化——错误回答中的模式会告诉你需要修复的内容
Additional resources
额外资源
- For worked examples, see examples.md
- Use for aggregate health checks across all requests
/ai-monitoring - Use for code-level debugging (crashes, config issues)
/ai-fixing-errors - Use to structure pipelines that are easy to trace
/ai-building-pipelines - Use to optimize based on patterns found in traces
/ai-improving-accuracy
- 如需实操示例,请查看examples.md
- 如需所有请求的整体健康检查,请使用
/ai-monitoring - 如需代码级调试(崩溃、配置问题),请使用
/ai-fixing-errors - 如需构建易于追踪的流水线,请使用
/ai-building-pipelines - 如需根据追踪记录中的模式进行优化,请使用
/ai-improving-accuracy