dspy-debugging-observability
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDSPy Debugging & Observability
DSPy调试与可观测性
Goal
目标
Debug, trace, and monitor DSPy programs using built-in inspection, MLflow tracing, and custom callbacks for production observability.
使用内置检查功能、MLflow追踪和自定义回调,对DSPy程序进行调试、追踪和监控,实现生产环境的可观测性。
When to Use
使用场景
- Debugging unexpected outputs
- Understanding multi-step program flow
- Production monitoring (cost, latency, errors)
- Analyzing optimizer behavior
- Tracking LLM API usage
- 调试异常输出
- 理解多步骤程序流程
- 生产环境监控(成本、延迟、错误)
- 分析优化器行为
- 追踪LLM API使用情况
Related Skills
相关Skill
- Optimize programs: dspy-miprov2-optimizer
- Evaluate quality: dspy-evaluation-suite
- Build agents: dspy-react-agent-builder
- 优化程序:dspy-miprov2-optimizer
- 评估质量:dspy-evaluation-suite
- 构建Agent:dspy-react-agent-builder
Inputs
输入参数
| Input | Type | Description |
|---|---|---|
| | Program to debug/monitor |
| | Optional custom callback (subclass of |
| 输入 | 类型 | 描述 |
|---|---|---|
| | 待调试/监控的程序 |
| | 可选自定义回调(继承自 |
Outputs
输出结果
| Output | Type | Description |
|---|---|---|
| | Raw execution trace from |
| | Cost, latency, token counts from callbacks |
| 输出 | 类型 | 描述 |
|---|---|---|
| | 来自 |
| | 来自回调的成本、延迟、令牌计数数据 |
Workflow
工作流程
Phase 1: Basic Inspection with inspect_history()
阶段1:使用inspect_history()进行基础检查
The simplest debugging approach:
python
import dspy
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))最简单的调试方法:
python
import dspy
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))Run program
运行程序
qa = dspy.ChainOfThought("question -> answer")
result = qa(question="What is the capital of France?")
qa = dspy.ChainOfThought("question -> answer")
result = qa(question="What is the capital of France?")
Inspect last execution (prints to console)
检查最近一次执行(打印到控制台)
dspy.inspect_history(n=1)
dspy.inspect_history(n=1)
To access raw history programmatically:
以编程方式访问原始历史记录:
from dspy.clients.base_lm import GLOBAL_HISTORY
for entry in GLOBAL_HISTORY[-1:]:
print(f"Model: {entry['model']}")
print(f"Usage: {entry.get('usage', {})}")
print(f"Cost: {entry.get('cost', 0)}")
undefinedfrom dspy.clients.base_lm import GLOBAL_HISTORY
for entry in GLOBAL_HISTORY[-1:]:
print(f"Model: {entry['model']}")
print(f"Usage: {entry.get('usage', {})}")
print(f"Cost: {entry.get('cost', 0)}")
undefinedPhase 2: MLflow Tracing
阶段2:MLflow追踪
MLflow integration requires explicit setup:
python
import dspy
import mlflowMLflow集成需要显式设置:
python
import dspy
import mlflowSetup MLflow (4 steps required)
设置MLflow(需4个步骤)
1. Set tracking URI and experiment
1. 设置追踪URI和实验
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("DSPy")
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("DSPy")
2. Enable DSPy autologging
2. 启用DSPy自动日志
mlflow.dspy.autolog(
log_traces=True, # Log traces during inference
log_traces_from_compile=True, # Log traces when compiling/optimizing
log_traces_from_eval=True, # Log traces during evaluation
log_compiles=True, # Log optimization process info
log_evals=True # Log evaluation call info
)
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
mlflow.dspy.autolog(
log_traces=True, # 推理期间记录追踪
log_traces_from_compile=True, # 编译/优化时记录追踪
log_traces_from_eval=True, # 评估期间记录追踪
log_compiles=True, # 记录优化过程信息
log_evals=True # 记录评估调用信息
)
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
Configure retriever (required before using dspy.Retrieve)
配置检索器(使用dspy.Retrieve前必须配置)
rm = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts")
dspy.configure(rm=rm)
class RAGPipeline(dspy.Module):
def init(self):
self.retrieve = dspy.Retrieve(k=3)
self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
context = self.retrieve(question).passages
return self.generate(context=context, question=question)pipeline = RAGPipeline()
result = pipeline(question="What is machine learning?")
rm = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts")
dspy.configure(rm=rm)
class RAGPipeline(dspy.Module):
def init(self):
self.retrieve = dspy.Retrieve(k=3)
self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
context = self.retrieve(question).passages
return self.generate(context=context, question=question)pipeline = RAGPipeline()
result = pipeline(question="What is machine learning?")
View traces in MLflow UI (run in terminal): mlflow ui --port 5000
在MLflow UI中查看追踪记录(终端运行):mlflow ui --port 5000
MLflow captures LLM calls, token usage, costs, and execution times when autolog is enabled.
启用自动日志后,MLflow会捕获LLM调用、令牌使用、成本和执行时间。Phase 3: Custom Callbacks for Production
阶段3:面向生产环境的自定义回调
Build custom callbacks for specialized monitoring:
python
import dspy
from dspy.utils.callback import BaseCallback
import logging
import time
from typing import Any
logger = logging.getLogger(__name__)
class ProductionMonitoringCallback(BaseCallback):
"""Track cost, latency, and errors in production."""
def __init__(self):
super().__init__()
self.total_cost = 0.0
self.total_tokens = 0
self.call_count = 0
self.errors = []
self.start_times = {}
def on_lm_start(self, call_id: str, instance: Any, inputs: dict[str, Any]):
"""Called when LM is invoked."""
self.start_times[call_id] = time.time()
def on_lm_end(self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None):
"""Called after LM finishes."""
if exception:
self.errors.append(str(exception))
logger.error(f"LLM error: {exception}")
return
# Calculate latency
start = self.start_times.pop(call_id, time.time())
latency = time.time() - start
# Extract usage from outputs
usage = outputs.get('usage', {}) if isinstance(outputs, dict) else {}
tokens = usage.get('total_tokens', 0)
model = outputs.get('model', 'unknown') if isinstance(outputs, dict) else 'unknown'
cost = self._estimate_cost(model, usage)
self.total_tokens += tokens
self.total_cost += cost
self.call_count += 1
logger.info(f"LLM call: {latency:.2f}s, {tokens} tokens, ${cost:.4f}")
def _estimate_cost(self, model: str, usage: dict[str, int]) -> float:
"""Estimate cost based on model pricing (update rates for 2026)."""
pricing = {
'gpt-4o-mini': {'input': 0.00015 / 1000, 'output': 0.0006 / 1000},
'gpt-4o': {'input': 0.0025 / 1000, 'output': 0.01 / 1000},
}
model_key = next((k for k in pricing if k in model), 'gpt-4o-mini')
input_cost = usage.get('prompt_tokens', 0) * pricing[model_key]['input']
output_cost = usage.get('completion_tokens', 0) * pricing[model_key]['output']
return input_cost + output_cost
def get_metrics(self) -> dict[str, Any]:
"""Return aggregated metrics."""
return {
'total_cost': self.total_cost,
'total_tokens': self.total_tokens,
'call_count': self.call_count,
'avg_cost_per_call': self.total_cost / max(self.call_count, 1),
'error_count': len(self.errors)
}构建自定义回调以实现专项监控:
python
import dspy
from dspy.utils.callback import BaseCallback
import logging
import time
from typing import Any
logger = logging.getLogger(__name__)
class ProductionMonitoringCallback(BaseCallback):
"""追踪生产环境中的成本、延迟和错误。"""
def __init__(self):
super().__init__()
self.total_cost = 0.0
self.total_tokens = 0
self.call_count = 0
self.errors = []
self.start_times = {}
def on_lm_start(self, call_id: str, instance: Any, inputs: dict[str, Any]):
"""当LLM被调用时触发。"""
self.start_times[call_id] = time.time()
def on_lm_end(self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None):
"""LLM调用完成后触发。"""
if exception:
self.errors.append(str(exception))
logger.error(f"LLM error: {exception}")
return
# 计算延迟
start = self.start_times.pop(call_id, time.time())
latency = time.time() - start
# 从输出中提取使用数据
usage = outputs.get('usage', {}) if isinstance(outputs, dict) else {}
tokens = usage.get('total_tokens', 0)
model = outputs.get('model', 'unknown') if isinstance(outputs, dict) else 'unknown'
cost = self._estimate_cost(model, usage)
self.total_tokens += tokens
self.total_cost += cost
self.call_count += 1
logger.info(f"LLM call: {latency:.2f}s, {tokens} tokens, ${cost:.4f}")
def _estimate_cost(self, model: str, usage: dict[str, int]) -> float:
"""根据模型定价估算成本(2026年费率已更新)。"""
pricing = {
'gpt-4o-mini': {'input': 0.00015 / 1000, 'output': 0.0006 / 1000},
'gpt-4o': {'input': 0.0025 / 1000, 'output': 0.01 / 1000},
}
model_key = next((k for k in pricing if k in model), 'gpt-4o-mini')
input_cost = usage.get('prompt_tokens', 0) * pricing[model_key]['input']
output_cost = usage.get('completion_tokens', 0) * pricing[model_key]['output']
return input_cost + output_cost
def get_metrics(self) -> dict[str, Any]:
"""返回聚合后的指标数据。"""
return {
'total_cost': self.total_cost,
'total_tokens': self.total_tokens,
'call_count': self.call_count,
'avg_cost_per_call': self.total_cost / max(self.call_count, 1),
'error_count': len(self.errors)
}Usage
使用示例
monitor = ProductionMonitoringCallback()
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[monitor])
monitor = ProductionMonitoringCallback()
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[monitor])
Run your program
运行程序
qa = dspy.ChainOfThought("question -> answer")
for question in questions:
result = qa(question=question)
qa = dspy.ChainOfThought("question -> answer")
for question in questions:
result = qa(question=question)
Get metrics
获取指标数据
metrics = monitor.get_metrics()
print(f"Total cost: ${metrics['total_cost']:.2f}")
print(f"Total calls: {metrics['call_count']}")
print(f"Errors: {metrics['error_count']}")
undefinedmetrics = monitor.get_metrics()
print(f"Total cost: ${metrics['total_cost']:.2f}")
print(f"Total calls: {metrics['call_count']}")
print(f"Errors: {metrics['error_count']}")
undefinedPhase 4: Sampling for High-Volume Production
阶段4:高流量生产环境的采样策略
For high-traffic applications, sample traces to reduce overhead:
python
import random
from dspy.utils.callback import BaseCallback
from typing import Any
class SamplingCallback(BaseCallback):
"""Sample 10% of traces."""
def __init__(self, sample_rate: float = 0.1):
super().__init__()
self.sample_rate = sample_rate
self.sampled_calls = []
def on_lm_end(self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None):
"""Sample a subset of LM calls."""
if random.random() < self.sample_rate:
self.sampled_calls.append({
'call_id': call_id,
'outputs': outputs,
'exception': exception
})针对高流量应用,可对追踪记录进行采样以降低开销:
python
import random
from dspy.utils.callback import BaseCallback
from typing import Any
class SamplingCallback(BaseCallback):
"""对10%的追踪记录进行采样。"""
def __init__(self, sample_rate: float = 0.1):
super().__init__()
self.sample_rate = sample_rate
self.sampled_calls = []
def on_lm_end(self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None):
"""对部分LLM调用进行采样。"""
if random.random() < self.sample_rate:
self.sampled_calls.append({
'call_id': call_id,
'outputs': outputs,
'exception': exception
})Use with high-volume apps
在高流量应用中使用
callback = SamplingCallback(sample_rate=0.1)
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[callback])
undefinedcallback = SamplingCallback(sample_rate=0.1)
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[callback])
undefinedBest Practices
最佳实践
- Use inspect_history() for debugging - Quick inspection during development
- MLflow for comprehensive tracing - Automatic instrumentation in production
- Sample high-volume traces - Reduce overhead with 1-10% sampling
- Privacy-aware logging - Redact PII before logging
- Async callbacks - Non-blocking callbacks for production
- 使用inspect_history()进行调试 - 开发期间快速检查
- MLflow实现全面追踪 - 生产环境中的自动埋点
- 对高流量追踪记录采样 - 采用1-10%的采样率降低开销
- 隐私友好型日志 - 记录前脱敏PII数据
- 异步回调 - 生产环境使用非阻塞回调
Limitations
局限性
- Callbacks are synchronous by default (can block LLM calls)
- MLflow tracing adds ~5-10ms overhead per call
- inspect_history() only stores recent calls (last 100 by default)
- Custom callbacks don't capture internal optimizer steps
- Cost estimation requires manual pricing table updates
- 默认回调为同步模式(可能阻塞LLM调用)
- MLflow追踪每次调用会增加约5-10ms的开销
- inspect_history()仅存储最近的调用记录(默认最多100条)
- 自定义回调无法捕获优化器的内部步骤
- 成本估算需要手动更新定价表
Official Documentation
官方文档
- DSPy Documentation: https://dspy.ai/
- DSPy GitHub: https://github.com/stanfordnlp/dspy
- Observability Guide: https://dspy.ai/tutorials/observability/
- DSPy文档: https://dspy.ai/
- DSPy GitHub: https://github.com/stanfordnlp/dspy
- 可观测性指南: https://dspy.ai/tutorials/observability/