ai-tracing-requests

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

See What Your AI Did on a Specific Request

查看你的AI针对特定请求执行的操作

Guide the user through tracing and debugging individual AI requests. The goal: for any request, see every LM call, retrieval step, intermediate result, token count, and latency.
引导用户完成单个AI请求的追踪与调试。目标:针对任意请求,查看所有LM调用、检索步骤、中间结果、Token计数以及延迟数据。

When you need this

适用场景

  • A customer reports a wrong answer — you need to see exactly what happened
  • Your pipeline is slow — you need to find which step is the bottleneck
  • Compliance requires audit trails of every AI decision
  • QA wants to inspect AI behavior before launch
  • You're debugging why an agent took unexpected actions
  • 客户反馈收到错误回答——你需要精准查看整个过程
  • 你的AI流水线运行缓慢——你需要定位瓶颈步骤
  • 合规要求记录每一个AI决策的审计追踪
  • QA团队需要在上线前检查AI行为
  • 你需要调试Agent为何执行了意外操作

How it's different from monitoring

与监控的区别

Monitoring (
/ai-monitoring
)
Tracing (this skill)
ScopeAggregate health across all requestsSingle request, full detail
Question answered"Is accuracy dropping this week?""Why did customer #12345 get a wrong answer at 2:14pm?"
OutputScores, trends, alertsCall traces, intermediate results, latencies
TimingPeriodic batch evaluationPer-request, real-time
监控(
/ai-monitoring
追踪(本技能)
范围所有请求的整体健康状况单个请求的完整细节
可解答的问题“本周准确率是否下降?”“为什么客户#12345在下午2:14收到了错误回答?”
输出内容分数、趋势、告警调用追踪记录、中间结果、延迟数据
时机周期性批量评估单请求级、实时

Step 1: Understand the need

步骤1:明确需求

Quick decision tree:
What are you debugging?
|
+- A specific wrong answer right now?
|  -> Step 2: Quick debugging with dspy.inspect_history
|
+- Need to trace requests in a running app?
|  -> Step 3-4: Add per-step tracing
|
+- Need a visual trace viewer for your team?
|  -> Step 5: Connect Langtrace, Phoenix, or Jaeger
|
+- Need to find patterns across many traces?
   -> Step 6: Search and filter traces
快速决策树:
What are you debugging?
|
+- A specific wrong answer right now?
|  -> Step 2: Quick debugging with dspy.inspect_history
|
+- Need to trace requests in a running app?
|  -> Step 3-4: Add per-step tracing
|
+- Need a visual trace viewer for your team?
|  -> Step 5: Connect Langtrace, Phoenix, or Jaeger
|
+- Need to find patterns across many traces?
   -> Step 6: Search and filter traces

Step 2: Quick debugging (no extra tools needed)

步骤2:快速调试(无需额外工具)

Inspect the last LM calls

查看最近的LM调用

The fastest way to see what happened:
python
import dspy
查看过程的最快方式:
python
import dspy

Run your program

Run your program

result = my_program(question="What is our refund policy?")
result = my_program(question="What is our refund policy?")

See the last 5 LM calls — shows full prompts and responses

See the last 5 LM calls — shows full prompts and responses

dspy.inspect_history(n=5)

This shows:
- The full prompt sent to the LM (including system message, few-shot examples, input)
- The LM's raw response
- How DSPy parsed the response into fields
dspy.inspect_history(n=5)

该方法会展示:
- 发送给LM的完整提示词(包括系统消息、少样本示例、输入内容)
- LM的原始响应
- DSPy如何将响应解析为字段

Time individual steps

单独统计各步骤耗时

python
import time

result = my_program(question="test")
python
import time

result = my_program(question="test")

Quick manual timing

Quick manual timing

start = time.time() step1_result = my_program.step1(question="test") step1_time = time.time() - start print(f"Step 1: {step1_time:.2f}s")
start = time.time() step2_result = my_program.step2(context=step1_result.context, question="test") step2_time = time.time() - start print(f"Step 2: {step2_time:.2f}s")
undefined
start = time.time() step1_result = my_program.step1(question="test") step1_time = time.time() - start print(f"Step 1: {step1_time:.2f}s")
start = time.time() step2_result = my_program.step2(context=step1_result.context, question="test") step2_time = time.time() - start print(f"Step 2: {step2_time:.2f}s")
undefined

JSONL trace logging

JSONL追踪日志

For persistent traces without any extra dependencies:
python
import json
import time
from datetime import datetime

class TracedProgram(dspy.Module):
    """Wraps any DSPy program to log per-step traces to JSONL."""
    def __init__(self, program, log_path="traces.jsonl"):
        self.program = program
        self.log_path = log_path

    def forward(self, **kwargs):
        trace_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
        steps = []

        start = time.time()
        result = self.program(**kwargs)
        total_time = time.time() - start

        # Log the trace
        entry = {
            "trace_id": trace_id,
            "timestamp": datetime.now().isoformat(),
            "inputs": {k: str(v) for k, v in kwargs.items()},
            "outputs": {k: str(getattr(result, k, "")) for k in result.keys()},
            "total_latency_ms": round(total_time * 1000),
        }
        with open(self.log_path, "a") as f:
            f.write(json.dumps(entry) + "\n")

        return result
无需任何额外依赖即可生成持久化追踪记录:
python
import json
import time
from datetime import datetime

class TracedProgram(dspy.Module):
    """Wraps any DSPy program to log per-step traces to JSONL."""
    def __init__(self, program, log_path="traces.jsonl"):
        self.program = program
        self.log_path = log_path

    def forward(self, **kwargs):
        trace_id = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
        steps = []

        start = time.time()
        result = self.program(**kwargs)
        total_time = time.time() - start

        # Log the trace
        entry = {
            "trace_id": trace_id,
            "timestamp": datetime.now().isoformat(),
            "inputs": {k: str(v) for k, v in kwargs.items()},
            "outputs": {k: str(getattr(result, k, "")) for k in result.keys()},
            "total_latency_ms": round(total_time * 1000),
        }
        with open(self.log_path, "a") as f:
            f.write(json.dumps(entry) + "\n")

        return result

Use it

Use it

traced = TracedProgram(my_program) result = traced(question="How do refunds work?")
undefined
traced = TracedProgram(my_program) result = traced(question="How do refunds work?")
undefined

Step 3: Per-step tracing in pipelines

步骤3:流水线中的分步追踪

For multi-step pipelines, trace each stage separately to see exactly where things go wrong:
python
import json
import time
import uuid
from datetime import datetime

class StepTracer:
    """Collects per-step timing and intermediate results."""
    def __init__(self):
        self.steps = []
        self.trace_id = str(uuid.uuid4())[:8]

    def trace_step(self, name, func, **kwargs):
        """Run a step and record its inputs, outputs, and latency."""
        start = time.time()
        result = func(**kwargs)
        latency = time.time() - start

        self.steps.append({
            "step": name,
            "inputs": {k: str(v)[:200] for k, v in kwargs.items()},
            "outputs": {k: str(getattr(result, k, ""))[:200] for k in result.keys()},
            "latency_ms": round(latency * 1000),
        })
        return result

    def summary(self):
        """Print a summary of all traced steps."""
        print(f"Trace {self.trace_id}:")
        total = sum(s["latency_ms"] for s in self.steps)
        for step in self.steps:
            pct = step["latency_ms"] / total * 100 if total > 0 else 0
            print(f"  {step['step']}: {step['latency_ms']}ms ({pct:.0f}%)")
        print(f"  Total: {total}ms")

    def to_dict(self):
        return {
            "trace_id": self.trace_id,
            "timestamp": datetime.now().isoformat(),
            "steps": self.steps,
            "total_latency_ms": sum(s["latency_ms"] for s in self.steps),
        }
对于多步骤流水线,单独追踪每个阶段以精准定位问题:
python
import json
import time
import uuid
from datetime import datetime

class StepTracer:
    """Collects per-step timing and intermediate results."""
    def __init__(self):
        self.steps = []
        self.trace_id = str(uuid.uuid4())[:8]

    def trace_step(self, name, func, **kwargs):
        """Run a step and record its inputs, outputs, and latency."""
        start = time.time()
        result = func(**kwargs)
        latency = time.time() - start

        self.steps.append({
            "step": name,
            "inputs": {k: str(v)[:200] for k, v in kwargs.items()},
            "outputs": {k: str(getattr(result, k, ""))[:200] for k in result.keys()},
            "latency_ms": round(latency * 1000),
        })
        return result

    def summary(self):
        """Print a summary of all traced steps."""
        print(f"Trace {self.trace_id}:")
        total = sum(s["latency_ms"] for s in self.steps)
        for step in self.steps:
            pct = step["latency_ms"] / total * 100 if total > 0 else 0
            print(f"  {step['step']}: {step['latency_ms']}ms ({pct:.0f}%)")
        print(f"  Total: {total}ms")

    def to_dict(self):
        return {
            "trace_id": self.trace_id,
            "timestamp": datetime.now().isoformat(),
            "steps": self.steps,
            "total_latency_ms": sum(s["latency_ms"] for s in self.steps),
        }

Use in a pipeline

Use in a pipeline

class TracedRAG(dspy.Module): def init(self): self.retrieve = dspy.Retrieve(k=3) self.answer = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
    tracer = StepTracer()

    retrieval = tracer.trace_step("retrieve", self.retrieve, query=question)

    answer = tracer.trace_step(
        "answer", self.answer,
        context=retrieval.passages, question=question,
    )

    tracer.summary()
    # Trace a1b2c3d4:
    #   retrieve: 120ms (15%)
    #   answer: 680ms (85%)
    #   Total: 800ms

    return answer
undefined
class TracedRAG(dspy.Module): def init(self): self.retrieve = dspy.Retrieve(k=3) self.answer = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
    tracer = StepTracer()

    retrieval = tracer.trace_step("retrieve", self.retrieve, query=question)

    answer = tracer.trace_step(
        "answer", self.answer,
        context=retrieval.passages, question=question,
    )

    tracer.summary()
    # Trace a1b2c3d4:
    #   retrieve: 120ms (15%)
    #   answer: 680ms (85%)
    #   Total: 800ms

    return answer
undefined

Save traces for later analysis

保存追踪记录供后续分析

python
def save_trace(tracer, path="traces.jsonl"):
    with open(path, "a") as f:
        f.write(json.dumps(tracer.to_dict()) + "\n")
python
def save_trace(tracer, path="traces.jsonl"):
    with open(path, "a") as f:
        f.write(json.dumps(tracer.to_dict()) + "\n")

Load and analyze traces

Load and analyze traces

def load_traces(path="traces.jsonl"): with open(path) as f: return [json.loads(line) for line in f]
def find_slow_traces(traces, threshold_ms=2000): return [t for t in traces if t["total_latency_ms"] > threshold_ms]
def find_failed_steps(traces): return [ t for t in traces if any("error" in str(s.get("outputs", "")).lower() for s in t["steps"]) ]
undefined
def load_traces(path="traces.jsonl"): with open(path) as f: return [json.loads(line) for line in f]
def find_slow_traces(traces, threshold_ms=2000): return [t for t in traces if t["total_latency_ms"] > threshold_ms]
def find_failed_steps(traces): return [ t for t in traces if any("error" in str(s.get("outputs", "")).lower() for s in t["steps"]) ]
undefined

Step 4: OpenTelemetry instrumentation

步骤4:OpenTelemetry埋点

For production tracing with any backend (Jaeger, Zipkin, Datadog, etc.):
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
适用于生产环境的追踪,可对接任意后端(Jaeger、Zipkin、Datadog等):
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

Setup — do this once at app startup

Setup — do this once at app startup

provider = TracerProvider() trace.set_tracer_provider(provider) tracer = trace.get_tracer("my-ai-app")
class OTelTracedProgram(dspy.Module): """Wraps a DSPy program with OpenTelemetry spans.""" def init(self, program): self.program = program
def forward(self, **kwargs):
    with tracer.start_as_current_span("ai_request") as span:
        span.set_attribute("ai.inputs", json.dumps({k: str(v) for k, v in kwargs.items()}))

        start = time.time()
        result = self.program(**kwargs)
        latency = time.time() - start

        span.set_attribute("ai.latency_ms", round(latency * 1000))
        span.set_attribute("ai.outputs", json.dumps(
            {k: str(getattr(result, k, "")) for k in result.keys()}
        ))

        return result
undefined
provider = TracerProvider() trace.set_tracer_provider(provider) tracer = trace.get_tracer("my-ai-app")
class OTelTracedProgram(dspy.Module): """Wraps a DSPy program with OpenTelemetry spans.""" def init(self, program): self.program = program
def forward(self, **kwargs):
    with tracer.start_as_current_span("ai_request") as span:
        span.set_attribute("ai.inputs", json.dumps({k: str(v) for k, v in kwargs.items()}))

        start = time.time()
        result = self.program(**kwargs)
        latency = time.time() - start

        span.set_attribute("ai.latency_ms", round(latency * 1000))
        span.set_attribute("ai.outputs", json.dumps(
            {k: str(getattr(result, k, "")) for k in result.keys()}
        ))

        return result
undefined

Trace individual pipeline steps with OTel

使用OTel追踪流水线的单个步骤

python
class OTelTracedRAG(dspy.Module):
    def __init__(self):
        self.retrieve = dspy.Retrieve(k=3)
        self.answer = dspy.ChainOfThought("context, question -> answer")

    def forward(self, question):
        with tracer.start_as_current_span("rag_pipeline") as parent:
            parent.set_attribute("question", question)

            with tracer.start_as_current_span("retrieve"):
                retrieval = self.retrieve(query=question)

            with tracer.start_as_current_span("generate_answer"):
                answer = self.answer(
                    context=retrieval.passages, question=question
                )

            return answer
python
class OTelTracedRAG(dspy.Module):
    def __init__(self):
        self.retrieve = dspy.Retrieve(k=3)
        self.answer = dspy.ChainOfThought("context, question -> answer")

    def forward(self, question):
        with tracer.start_as_current_span("rag_pipeline") as parent:
            parent.set_attribute("question", question)

            with tracer.start_as_current_span("retrieve"):
                retrieval = self.retrieve(query=question)

            with tracer.start_as_current_span("generate_answer"):
                answer = self.answer(
                    context=retrieval.passages, question=question
                )

            return answer

Step 5: Connect a trace viewer

步骤5:连接追踪查看器

Option A: Langtrace (best DSPy integration)

选项A:Langtrace(最佳DSPy集成)

First-class DSPy auto-instrumentation — one line to trace all LM calls:
bash
pip install langtrace-python-sdk
python
from langtrace_python_sdk import langtrace

langtrace.init(api_key="your-key")  # or use LANGTRACE_API_KEY env var
原生DSPy自动埋点——一行代码即可追踪所有LM调用:
bash
pip install langtrace-python-sdk
python
from langtrace_python_sdk import langtrace

langtrace.init(api_key="your-key")  # or use LANGTRACE_API_KEY env var

That's it — all DSPy calls are now traced automatically

That's it — all DSPy calls are now traced automatically

result = my_program(question="test")
result = my_program(question="test")

View traces at app.langtrace.ai

View traces at app.langtrace.ai

undefined
undefined

Option B: Arize Phoenix (open-source, self-hosted)

选项B:Arize Phoenix(开源、自托管)

bash
pip install arize-phoenix openinference-instrumentation-dspy
python
import phoenix as px
from openinference.instrumentation.dspy import DSPyInstrumentor
bash
pip install arize-phoenix openinference-instrumentation-dspy
python
import phoenix as px
from openinference.instrumentation.dspy import DSPyInstrumentor

Launch local trace viewer

Launch local trace viewer

px.launch_app() # Opens at http://localhost:6006
px.launch_app() # Opens at http://localhost:6006

Auto-instrument DSPy

Auto-instrument DSPy

DSPyInstrumentor().instrument()
DSPyInstrumentor().instrument()

All DSPy calls are now traced

All DSPy calls are now traced

result = my_program(question="test")
undefined
result = my_program(question="test")
undefined

Option C: Jaeger (open-source, Docker)

选项C:Jaeger(开源、Docker部署)

bash
docker run -d -p 16686:16686 -p 4317:4317 jaegertracing/all-in-one:latest
python
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
bash
docker run -d -p 16686:16686 -p 4317:4317 jaegertracing/all-in-one:latest
python
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

Export spans to Jaeger

Export spans to Jaeger

exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True) provider.add_span_processor(BatchSpanProcessor(exporter))
exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True) provider.add_span_processor(BatchSpanProcessor(exporter))

View traces at http://localhost:16686

View traces at http://localhost:16686

undefined
undefined

Comparison

对比

FeatureLangtraceArize PhoenixJaeger
DSPy auto-instrumentationYes (built-in)Yes (plugin)Manual
Setup effortOne lineTwo lines + DockerDocker + manual spans
Self-hosted optionYesYesYes
Cloud optionYesYesNo
LM call detailsPrompts, tokens, costPrompts, tokensCustom attributes
Best forDSPy-first teamsTeams wanting open-source + UITeams already using Jaeger
特性LangtraceArize PhoenixJaeger
DSPy自动埋点是(内置)是(插件)手动
搭建难度一行代码两行代码+DockerDocker+手动埋点
自托管选项
云端选项
LM调用详情提示词、Token、成本提示词、Token自定义属性
最佳适用场景以DSPy为核心的团队偏好开源+可视化界面的团队已使用Jaeger的团队

Step 6: Search and filter traces

步骤6:搜索与筛选追踪记录

Find traces by criteria

按条件查找追踪记录

python
def search_traces(traces, **filters):
    """Search traces by user, time range, latency, or content."""
    results = traces

    if "min_latency_ms" in filters:
        results = [t for t in results if t["total_latency_ms"] >= filters["min_latency_ms"]]

    if "after" in filters:
        results = [t for t in results if t["timestamp"] >= filters["after"]]

    if "before" in filters:
        results = [t for t in results if t["timestamp"] <= filters["before"]]

    if "contains" in filters:
        keyword = filters["contains"].lower()
        results = [
            t for t in results
            if keyword in json.dumps(t).lower()
        ]

    return results
python
def search_traces(traces, **filters):
    """Search traces by user, time range, latency, or content."""
    results = traces

    if "min_latency_ms" in filters:
        results = [t for t in results if t["total_latency_ms"] >= filters["min_latency_ms"]]

    if "after" in filters:
        results = [t for t in results if t["timestamp"] >= filters["after"]]

    if "before" in filters:
        results = [t for t in results if t["timestamp"] <= filters["before"]]

    if "contains" in filters:
        keyword = filters["contains"].lower()
        results = [
            t for t in results
            if keyword in json.dumps(t).lower()
        ]

    return results

Find slow requests from today

Find slow requests from today

slow = search_traces( load_traces(), min_latency_ms=3000, after="2025-01-15T00:00:00", )
undefined
slow = search_traces( load_traces(), min_latency_ms=3000, after="2025-01-15T00:00:00", )
undefined

Aggregate trace statistics

汇总追踪记录统计数据

python
def trace_stats(traces):
    """Summary statistics across traces."""
    latencies = [t["total_latency_ms"] for t in traces]
    if not latencies:
        return "No traces found"

    latencies.sort()
    return {
        "count": len(latencies),
        "p50_ms": latencies[len(latencies) // 2],
        "p95_ms": latencies[int(len(latencies) * 0.95)],
        "p99_ms": latencies[int(len(latencies) * 0.99)],
        "max_ms": latencies[-1],
    }
python
def trace_stats(traces):
    """Summary statistics across traces."""
    latencies = [t["total_latency_ms"] for t in traces]
    if not latencies:
        return "No traces found"

    latencies.sort()
    return {
        "count": len(latencies),
        "p50_ms": latencies[len(latencies) // 2],
        "p95_ms": latencies[int(len(latencies) * 0.95)],
        "p99_ms": latencies[int(len(latencies) * 0.99)],
        "max_ms": latencies[-1],
    }

Step 7: Use traces to improve your AI

步骤7:利用追踪记录优化AI

Traces aren't just for debugging — they're a source of improvement.
追踪记录不仅用于调试——还是AI优化的数据源。

Find patterns in wrong answers

定位错误回答的模式

python
undefined
python
undefined

Load traces where the answer was marked wrong by a user or metric

Load traces where the answer was marked wrong by a user or metric

wrong_traces = search_traces(load_traces(), contains='"is_correct": false')
wrong_traces = search_traces(load_traces(), contains='"is_correct": false')

Check which step is most often the bottleneck

Check which step is most often the bottleneck

from collections import Counter slow_steps = Counter() for t in wrong_traces: slowest = max(t["steps"], key=lambda s: s["latency_ms"]) slow_steps[slowest["step"]] += 1
print(slow_steps)
from collections import Counter slow_steps = Counter() for t in wrong_traces: slowest = max(t["steps"], key=lambda s: s["latency_ms"]) slow_steps[slowest["step"]] += 1
print(slow_steps)

Counter({"retrieve": 23, "answer": 7})

Counter({"retrieve": 23, "answer": 7})

-> Retrieval is the problem, not the answer generation

-> Retrieval is the problem, not the answer generation

undefined
undefined

Build training data from failures

从失败案例中构建训练数据

python
undefined
python
undefined

Extract failed examples for re-optimization

Extract failed examples for re-optimization

failed_examples = [] for t in wrong_traces: ex = dspy.Example( question=t.get("inputs", {}).get("question", ""), ).with_inputs("question") failed_examples.append(ex)
failed_examples = [] for t in wrong_traces: ex = dspy.Example( question=t.get("inputs", {}).get("question", ""), ).with_inputs("question") failed_examples.append(ex)

Add to training set and re-optimize

Add to training set and re-optimize

See /ai-improving-accuracy

See /ai-improving-accuracy

undefined
undefined

Key patterns

核心模式

  • Start with
    dspy.inspect_history
    — it's free and solves most debugging needs
  • Add JSONL tracing before you need it — you can't debug traces you didn't log
  • Trace at the step level, not just the request level — per-step latency reveals bottlenecks
  • Use OpenTelemetry for production — it's the standard, works with any backend
  • Langtrace is easiest for DSPy — one-line setup with automatic instrumentation
  • Traces feed optimization — patterns in wrong answers tell you what to fix
  • dspy.inspect_history
    开始
    ——无需额外成本,可解决大多数调试需求
  • 提前添加JSONL追踪——没有记录的追踪记录无法用于调试
  • 按步骤追踪,而非仅按请求追踪——分步延迟数据可揭示瓶颈
  • 生产环境使用OpenTelemetry——行业标准,可对接任意后端
  • Langtrace是DSPy的最佳选择——一行代码搭建,自动埋点
  • 追踪记录助力优化——错误回答中的模式会告诉你需要修复的内容

Additional resources

额外资源

  • For worked examples, see examples.md
  • Use
    /ai-monitoring
    for aggregate health checks across all requests
  • Use
    /ai-fixing-errors
    for code-level debugging (crashes, config issues)
  • Use
    /ai-building-pipelines
    to structure pipelines that are easy to trace
  • Use
    /ai-improving-accuracy
    to optimize based on patterns found in traces
  • 如需实操示例,请查看examples.md
  • 如需所有请求的整体健康检查,请使用
    /ai-monitoring
  • 如需代码级调试(崩溃、配置问题),请使用
    /ai-fixing-errors
  • 如需构建易于追踪的流水线,请使用
    /ai-building-pipelines
  • 如需根据追踪记录中的模式进行优化,请使用
    /ai-improving-accuracy