dspy-debugging-observability

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

DSPy Debugging & Observability

DSPy调试与可观测性

Goal

目标

Debug, trace, and monitor DSPy programs using built-in inspection, MLflow tracing, and custom callbacks for production observability.
使用内置检查功能、MLflow追踪和自定义回调,对DSPy程序进行调试、追踪和监控,实现生产环境的可观测性。

When to Use

使用场景

  • Debugging unexpected outputs
  • Understanding multi-step program flow
  • Production monitoring (cost, latency, errors)
  • Analyzing optimizer behavior
  • Tracking LLM API usage
  • 调试异常输出
  • 理解多步骤程序流程
  • 生产环境监控(成本、延迟、错误)
  • 分析优化器行为
  • 追踪LLM API使用情况

Related Skills

相关Skill

  • Optimize programs: dspy-miprov2-optimizer
  • Evaluate quality: dspy-evaluation-suite
  • Build agents: dspy-react-agent-builder
  • 优化程序:dspy-miprov2-optimizer
  • 评估质量:dspy-evaluation-suite
  • 构建Agent:dspy-react-agent-builder

Inputs

输入参数

InputTypeDescription
program
dspy.Module
Program to debug/monitor
callback
BaseCallback
Optional custom callback (subclass of
dspy.utils.callback.BaseCallback
)
输入类型描述
program
dspy.Module
待调试/监控的程序
callback
BaseCallback
可选自定义回调(继承自
dspy.utils.callback.BaseCallback

Outputs

输出结果

OutputTypeDescription
GLOBAL_HISTORY
list[dict]
Raw execution trace from
dspy.clients.base_lm
metrics
dict
Cost, latency, token counts from callbacks
输出类型描述
GLOBAL_HISTORY
list[dict]
来自
dspy.clients.base_lm
的原始执行追踪记录
metrics
dict
来自回调的成本、延迟、令牌计数数据

Workflow

工作流程

Phase 1: Basic Inspection with inspect_history()

阶段1:使用inspect_history()进行基础检查

The simplest debugging approach:
python
import dspy

dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
最简单的调试方法:
python
import dspy

dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

Run program

运行程序

qa = dspy.ChainOfThought("question -> answer") result = qa(question="What is the capital of France?")
qa = dspy.ChainOfThought("question -> answer") result = qa(question="What is the capital of France?")

Inspect last execution (prints to console)

检查最近一次执行(打印到控制台)

dspy.inspect_history(n=1)
dspy.inspect_history(n=1)

To access raw history programmatically:

以编程方式访问原始历史记录:

from dspy.clients.base_lm import GLOBAL_HISTORY for entry in GLOBAL_HISTORY[-1:]: print(f"Model: {entry['model']}") print(f"Usage: {entry.get('usage', {})}") print(f"Cost: {entry.get('cost', 0)}")
undefined
from dspy.clients.base_lm import GLOBAL_HISTORY for entry in GLOBAL_HISTORY[-1:]: print(f"Model: {entry['model']}") print(f"Usage: {entry.get('usage', {})}") print(f"Cost: {entry.get('cost', 0)}")
undefined

Phase 2: MLflow Tracing

阶段2:MLflow追踪

MLflow integration requires explicit setup:
python
import dspy
import mlflow
MLflow集成需要显式设置:
python
import dspy
import mlflow

Setup MLflow (4 steps required)

设置MLflow(需4个步骤)

1. Set tracking URI and experiment

1. 设置追踪URI和实验

mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_experiment("DSPy")
mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_experiment("DSPy")

2. Enable DSPy autologging

2. 启用DSPy自动日志

mlflow.dspy.autolog( log_traces=True, # Log traces during inference log_traces_from_compile=True, # Log traces when compiling/optimizing log_traces_from_eval=True, # Log traces during evaluation log_compiles=True, # Log optimization process info log_evals=True # Log evaluation call info )
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
mlflow.dspy.autolog( log_traces=True, # 推理期间记录追踪 log_traces_from_compile=True, # 编译/优化时记录追踪 log_traces_from_eval=True, # 评估期间记录追踪 log_compiles=True, # 记录优化过程信息 log_evals=True # 记录评估调用信息 )
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

Configure retriever (required before using dspy.Retrieve)

配置检索器(使用dspy.Retrieve前必须配置)

rm = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts") dspy.configure(rm=rm)
class RAGPipeline(dspy.Module): def init(self): self.retrieve = dspy.Retrieve(k=3) self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
    context = self.retrieve(question).passages
    return self.generate(context=context, question=question)
pipeline = RAGPipeline() result = pipeline(question="What is machine learning?")
rm = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts") dspy.configure(rm=rm)
class RAGPipeline(dspy.Module): def init(self): self.retrieve = dspy.Retrieve(k=3) self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
    context = self.retrieve(question).passages
    return self.generate(context=context, question=question)
pipeline = RAGPipeline() result = pipeline(question="What is machine learning?")

View traces in MLflow UI (run in terminal): mlflow ui --port 5000

在MLflow UI中查看追踪记录(终端运行):mlflow ui --port 5000


MLflow captures LLM calls, token usage, costs, and execution times when autolog is enabled.

启用自动日志后,MLflow会捕获LLM调用、令牌使用、成本和执行时间。

Phase 3: Custom Callbacks for Production

阶段3:面向生产环境的自定义回调

Build custom callbacks for specialized monitoring:
python
import dspy
from dspy.utils.callback import BaseCallback
import logging
import time
from typing import Any

logger = logging.getLogger(__name__)

class ProductionMonitoringCallback(BaseCallback):
    """Track cost, latency, and errors in production."""

    def __init__(self):
        super().__init__()
        self.total_cost = 0.0
        self.total_tokens = 0
        self.call_count = 0
        self.errors = []
        self.start_times = {}

    def on_lm_start(self, call_id: str, instance: Any, inputs: dict[str, Any]):
        """Called when LM is invoked."""
        self.start_times[call_id] = time.time()

    def on_lm_end(self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None):
        """Called after LM finishes."""
        if exception:
            self.errors.append(str(exception))
            logger.error(f"LLM error: {exception}")
            return

        # Calculate latency
        start = self.start_times.pop(call_id, time.time())
        latency = time.time() - start

        # Extract usage from outputs
        usage = outputs.get('usage', {}) if isinstance(outputs, dict) else {}
        tokens = usage.get('total_tokens', 0)
        model = outputs.get('model', 'unknown') if isinstance(outputs, dict) else 'unknown'
        cost = self._estimate_cost(model, usage)

        self.total_tokens += tokens
        self.total_cost += cost
        self.call_count += 1

        logger.info(f"LLM call: {latency:.2f}s, {tokens} tokens, ${cost:.4f}")

    def _estimate_cost(self, model: str, usage: dict[str, int]) -> float:
        """Estimate cost based on model pricing (update rates for 2026)."""
        pricing = {
            'gpt-4o-mini': {'input': 0.00015 / 1000, 'output': 0.0006 / 1000},
            'gpt-4o': {'input': 0.0025 / 1000, 'output': 0.01 / 1000},
        }
        model_key = next((k for k in pricing if k in model), 'gpt-4o-mini')
        input_cost = usage.get('prompt_tokens', 0) * pricing[model_key]['input']
        output_cost = usage.get('completion_tokens', 0) * pricing[model_key]['output']
        return input_cost + output_cost

    def get_metrics(self) -> dict[str, Any]:
        """Return aggregated metrics."""
        return {
            'total_cost': self.total_cost,
            'total_tokens': self.total_tokens,
            'call_count': self.call_count,
            'avg_cost_per_call': self.total_cost / max(self.call_count, 1),
            'error_count': len(self.errors)
        }
构建自定义回调以实现专项监控:
python
import dspy
from dspy.utils.callback import BaseCallback
import logging
import time
from typing import Any

logger = logging.getLogger(__name__)

class ProductionMonitoringCallback(BaseCallback):
    """追踪生产环境中的成本、延迟和错误。"""

    def __init__(self):
        super().__init__()
        self.total_cost = 0.0
        self.total_tokens = 0
        self.call_count = 0
        self.errors = []
        self.start_times = {}

    def on_lm_start(self, call_id: str, instance: Any, inputs: dict[str, Any]):
        """当LLM被调用时触发。"""
        self.start_times[call_id] = time.time()

    def on_lm_end(self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None):
        """LLM调用完成后触发。"""
        if exception:
            self.errors.append(str(exception))
            logger.error(f"LLM error: {exception}")
            return

        # 计算延迟
        start = self.start_times.pop(call_id, time.time())
        latency = time.time() - start

        # 从输出中提取使用数据
        usage = outputs.get('usage', {}) if isinstance(outputs, dict) else {}
        tokens = usage.get('total_tokens', 0)
        model = outputs.get('model', 'unknown') if isinstance(outputs, dict) else 'unknown'
        cost = self._estimate_cost(model, usage)

        self.total_tokens += tokens
        self.total_cost += cost
        self.call_count += 1

        logger.info(f"LLM call: {latency:.2f}s, {tokens} tokens, ${cost:.4f}")

    def _estimate_cost(self, model: str, usage: dict[str, int]) -> float:
        """根据模型定价估算成本(2026年费率已更新)。"""
        pricing = {
            'gpt-4o-mini': {'input': 0.00015 / 1000, 'output': 0.0006 / 1000},
            'gpt-4o': {'input': 0.0025 / 1000, 'output': 0.01 / 1000},
        }
        model_key = next((k for k in pricing if k in model), 'gpt-4o-mini')
        input_cost = usage.get('prompt_tokens', 0) * pricing[model_key]['input']
        output_cost = usage.get('completion_tokens', 0) * pricing[model_key]['output']
        return input_cost + output_cost

    def get_metrics(self) -> dict[str, Any]:
        """返回聚合后的指标数据。"""
        return {
            'total_cost': self.total_cost,
            'total_tokens': self.total_tokens,
            'call_count': self.call_count,
            'avg_cost_per_call': self.total_cost / max(self.call_count, 1),
            'error_count': len(self.errors)
        }

Usage

使用示例

monitor = ProductionMonitoringCallback() dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[monitor])
monitor = ProductionMonitoringCallback() dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[monitor])

Run your program

运行程序

qa = dspy.ChainOfThought("question -> answer") for question in questions: result = qa(question=question)
qa = dspy.ChainOfThought("question -> answer") for question in questions: result = qa(question=question)

Get metrics

获取指标数据

metrics = monitor.get_metrics() print(f"Total cost: ${metrics['total_cost']:.2f}") print(f"Total calls: {metrics['call_count']}") print(f"Errors: {metrics['error_count']}")
undefined
metrics = monitor.get_metrics() print(f"Total cost: ${metrics['total_cost']:.2f}") print(f"Total calls: {metrics['call_count']}") print(f"Errors: {metrics['error_count']}")
undefined

Phase 4: Sampling for High-Volume Production

阶段4:高流量生产环境的采样策略

For high-traffic applications, sample traces to reduce overhead:
python
import random
from dspy.utils.callback import BaseCallback
from typing import Any

class SamplingCallback(BaseCallback):
    """Sample 10% of traces."""

    def __init__(self, sample_rate: float = 0.1):
        super().__init__()
        self.sample_rate = sample_rate
        self.sampled_calls = []

    def on_lm_end(self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None):
        """Sample a subset of LM calls."""
        if random.random() < self.sample_rate:
            self.sampled_calls.append({
                'call_id': call_id,
                'outputs': outputs,
                'exception': exception
            })
针对高流量应用,可对追踪记录进行采样以降低开销:
python
import random
from dspy.utils.callback import BaseCallback
from typing import Any

class SamplingCallback(BaseCallback):
    """对10%的追踪记录进行采样。"""

    def __init__(self, sample_rate: float = 0.1):
        super().__init__()
        self.sample_rate = sample_rate
        self.sampled_calls = []

    def on_lm_end(self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None):
        """对部分LLM调用进行采样。"""
        if random.random() < self.sample_rate:
            self.sampled_calls.append({
                'call_id': call_id,
                'outputs': outputs,
                'exception': exception
            })

Use with high-volume apps

在高流量应用中使用

callback = SamplingCallback(sample_rate=0.1) dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[callback])
undefined
callback = SamplingCallback(sample_rate=0.1) dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[callback])
undefined

Best Practices

最佳实践

  1. Use inspect_history() for debugging - Quick inspection during development
  2. MLflow for comprehensive tracing - Automatic instrumentation in production
  3. Sample high-volume traces - Reduce overhead with 1-10% sampling
  4. Privacy-aware logging - Redact PII before logging
  5. Async callbacks - Non-blocking callbacks for production
  1. 使用inspect_history()进行调试 - 开发期间快速检查
  2. MLflow实现全面追踪 - 生产环境中的自动埋点
  3. 对高流量追踪记录采样 - 采用1-10%的采样率降低开销
  4. 隐私友好型日志 - 记录前脱敏PII数据
  5. 异步回调 - 生产环境使用非阻塞回调

Limitations

局限性

  • Callbacks are synchronous by default (can block LLM calls)
  • MLflow tracing adds ~5-10ms overhead per call
  • inspect_history() only stores recent calls (last 100 by default)
  • Custom callbacks don't capture internal optimizer steps
  • Cost estimation requires manual pricing table updates
  • 默认回调为同步模式(可能阻塞LLM调用)
  • MLflow追踪每次调用会增加约5-10ms的开销
  • inspect_history()仅存储最近的调用记录(默认最多100条)
  • 自定义回调无法捕获优化器的内部步骤
  • 成本估算需要手动更新定价表

Official Documentation

官方文档