dspy-framework

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

DSPy Framework

DSPy 框架


progressive_disclosure: entry_point: summary: "Declarative framework for automatic prompt optimization treating prompts as code" when_to_use: - "When optimizing prompts systematically with evaluation data" - "When building production LLM systems requiring accuracy improvements" - "When implementing RAG, classification, or structured extraction tasks" - "When version-controlled, reproducible prompts are needed" quick_start: - "pip install dspy-ai" - "Define signature: class QA(dspy.Signature): question = dspy.InputField(); answer = dspy.OutputField()" - "Create module: qa = dspy.ChainOfThought(QA)" - "Optimize: optimizer.compile(qa, trainset=examples)" token_estimate: entry: 75 full: 5500


progressive_disclosure: entry_point: summary: "将提示词视为代码的声明式自动提示词优化框架" when_to_use: - "需要使用评估数据系统化优化提示词时" - "构建需要提升准确率的生产级LLM系统时" - "实现RAG、分类或结构化提取任务时" - "需要版本可控、可复现的提示词时" quick_start: - "pip install dspy-ai" - "定义签名:class QA(dspy.Signature): question = dspy.InputField(); answer = dspy.OutputField()" - "创建模块:qa = dspy.ChainOfThought(QA)" - "优化:optimizer.compile(qa, trainset=examples)" token_estimate: entry: 75 full: 5500

Core Philosophy

核心理念

DSPy (Declarative Self-improving Python) shifts focus from manual prompt engineering to programming language models. Treat prompts as code with:
  • Declarative signatures defining inputs/outputs
  • Automatic optimization via compilers
  • Version control and systematic testing
  • Reproducible results across model changes
Key Principle: Don't write prompts manually—define task specifications and let DSPy optimize them.
DSPy(Declarative Self-improving Python,声明式自改进Python)将关注点从手动提示词工程转向编程语言模型。它将提示词视为代码,具备以下特性:
  • 声明式签名:定义输入/输出
  • 自动优化:通过编译器实现
  • 版本控制与系统化测试
  • 跨模型可复现的结果
核心原则:无需手动编写提示词——只需定义任务规范,让DSPy自动优化。

Core Concepts

核心概念

Signatures: Defining Task Interfaces

签名:定义任务接口

Signatures specify what your LM module should do (inputs → outputs) without saying how.
Basic Signature:
python
import dspy
签名指定LM模块应完成的任务(输入→输出),无需说明实现方式。
基础签名:
python
import dspy

Inline signature (quick)

内联签名(快速方式)

qa_module = dspy.ChainOfThought("question -> answer")
qa_module = dspy.ChainOfThought("question -> answer")

Class-based signature (recommended for production)

类基签名(生产环境推荐)

class QuestionAnswer(dspy.Signature): """Answer questions with short factual answers."""
question = dspy.InputField()
answer = dspy.OutputField(desc="often between 1 and 5 words")
class QuestionAnswer(dspy.Signature): """用简短事实性答案回答问题。"""
question = dspy.InputField()
answer = dspy.OutputField(desc="通常为1-5个单词")

Use signature

使用签名

qa = dspy.ChainOfThought(QuestionAnswer) response = qa(question="What is the capital of France?") print(response.answer) # "Paris"

**Advanced Signatures with Type Hints**:
```python
from typing import List

class DocumentSummary(dspy.Signature):
    """Generate concise document summaries."""

    document: str = dspy.InputField(desc="Full text to summarize")
    key_points: List[str] = dspy.OutputField(desc="3-5 bullet points")
    summary: str = dspy.OutputField(desc="2-3 sentence summary")
    sentiment: str = dspy.OutputField(desc="positive, negative, or neutral")
qa = dspy.ChainOfThought(QuestionAnswer) response = qa(question="法国的首都是什么?") print(response.answer) # "Paris"

**带类型提示的高级签名**:
```python
from typing import List

class DocumentSummary(dspy.Signature):
    """生成简洁的文档摘要。"""

    document: str = dspy.InputField(desc="待总结的完整文本")
    key_points: List[str] = dspy.OutputField(desc="3-5个要点")
    summary: str = dspy.OutputField(desc="2-3句话的摘要")
    sentiment: str = dspy.OutputField(desc="positive, negative, or neutral")

Type hints provide strong typing and validation

类型提示提供强类型与验证

summarizer = dspy.ChainOfThought(DocumentSummary) result = summarizer(document="Long document text...")

**Field Descriptions**:
- Short, descriptive phrases (not full sentences)
- Examples: `desc="often between 1 and 5 words"`, `desc="JSON format"`
- Used by optimizers to improve prompt quality
summarizer = dspy.ChainOfThought(DocumentSummary) result = summarizer(document="长文档文本...")

**字段描述**:
- 简短的描述性短语(非完整句子)
- 示例:`desc="通常为1-5个单词"`, `desc="JSON格式"`
- 供优化器用于提升提示词质量

Modules: Building Blocks

模块:构建块

Modules are DSPy's reasoning patterns—replacements for manual prompt engineering.
ChainOfThought (CoT):
python
undefined
模块是DSPy的推理模式,可替代手动提示词工程。
ChainOfThought(CoT,思维链):
python
undefined

Zero-shot reasoning

零样本推理

class Reasoning(dspy.Signature): """Solve complex problems step by step.""" problem = dspy.InputField() solution = dspy.OutputField()
cot = dspy.ChainOfThought(Reasoning) result = cot(problem="Roger has 5 tennis balls. He buys 2 cans of 3 balls each. How many total?") print(result.solution) # Includes reasoning steps automatically print(result.rationale) # Access the chain-of-thought reasoning

**Retrieve Module (RAG)**:
```python
class RAGSignature(dspy.Signature):
    """Answer questions using retrieved context."""
    question = dspy.InputField()
    context = dspy.InputField(desc="relevant passages")
    answer = dspy.OutputField(desc="answer based on context")
class Reasoning(dspy.Signature): """逐步解决复杂问题。""" problem = dspy.InputField() solution = dspy.OutputField()
cot = dspy.ChainOfThought(Reasoning) result = cot(problem="Roger有5个网球。他买了2罐,每罐3个球。总共有多少个?") print(result.solution) # 自动包含推理步骤 print(result.rationale) # 访问思维链推理过程

**检索模块(RAG)**:
```python
class RAGSignature(dspy.Signature):
    """使用检索到的上下文回答问题。"""
    question = dspy.InputField()
    context = dspy.InputField(desc="相关段落")
    answer = dspy.OutputField(desc="基于上下文的答案")

Combine retrieval + reasoning

结合检索+推理

retriever = dspy.Retrieve(k=3) # Retrieve top 3 passages rag = dspy.ChainOfThought(RAGSignature)
retriever = dspy.Retrieve(k=3) # 检索前3个相关段落 rag = dspy.ChainOfThought(RAGSignature)

Use in pipeline

在流水线中使用

question = "What is quantum entanglement?" context = retriever(question).passages answer = rag(question=question, context=context)

**ReAct (Reasoning + Acting)**:
```python
class ResearchTask(dspy.Signature):
    """Research a topic using tools."""
    topic = dspy.InputField()
    findings = dspy.OutputField()
question = "什么是量子纠缠?" context = retriever(question).passages answer = rag(question=question, context=context)

**ReAct(推理+行动)**:
```python
class ResearchTask(dspy.Signature):
    """使用工具研究主题。"""
    topic = dspy.InputField()
    findings = dspy.OutputField()

ReAct interleaves reasoning with tool calls

ReAct交替进行推理与工具调用

react = dspy.ReAct(ResearchTask, tools=[web_search, calculator]) result = react(topic="Apple stock price change last month")
react = dspy.ReAct(ResearchTask, tools=[web_search, calculator]) result = react(topic="苹果公司上月股价变动")

Automatically uses tools when needed

必要时自动使用工具


**ProgramOfThought**:
```python

**ProgramOfThought(思维程序)**:
```python

Generate and execute Python code

生成并执行Python代码

class MathProblem(dspy.Signature): """Solve math problems by writing Python code.""" problem = dspy.InputField() code = dspy.OutputField(desc="Python code to solve problem") result = dspy.OutputField(desc="final numerical answer")
pot = dspy.ProgramOfThought(MathProblem) answer = pot(problem="Calculate compound interest on $1000 at 5% for 10 years")

**Custom Modules**:
```python
class MultiStepRAG(dspy.Module):
    """Custom module combining retrieval and reasoning."""

    def __init__(self, num_passages=3):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=num_passages)
        self.generate = dspy.ChainOfThought("context, question -> answer")

    def forward(self, question):
        # Retrieve relevant passages
        context = self.retrieve(question).passages

        # Generate answer with context
        prediction = self.generate(context=context, question=question)

        # Return with metadata
        return dspy.Prediction(
            answer=prediction.answer,
            context=context,
            rationale=prediction.rationale
        )
class MathProblem(dspy.Signature): """通过编写Python代码解决数学问题。""" problem = dspy.InputField() code = dspy.OutputField(desc="用于解决问题的Python代码") result = dspy.OutputField(desc="最终数值答案")
pot = dspy.ProgramOfThought(MathProblem) answer = pot(problem="计算1000美元在5%利率下10年的复利")

**自定义模块**:
```python
class MultiStepRAG(dspy.Module):
    """结合检索与推理的自定义模块。"""

    def __init__(self, num_passages=3):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=num_passages)
        self.generate = dspy.ChainOfThought("context, question -> answer")

    def forward(self, question):
        # 检索相关段落
        context = self.retrieve(question).passages

        # 结合上下文生成答案
        prediction = self.generate(context=context, question=question)

        # 返回带元数据的结果
        return dspy.Prediction(
            answer=prediction.answer,
            context=context,
            rationale=prediction.rationale
        )

Use custom module

使用自定义模块

rag = MultiStepRAG(num_passages=5) optimized_rag = optimizer.compile(rag, trainset=examples)
undefined
rag = MultiStepRAG(num_passages=5) optimized_rag = optimizer.compile(rag, trainset=examples)
undefined

Optimizers: Automatic Prompt Improvement

优化器:自动提示词改进

Optimizers compile your high-level program into optimized prompts or fine-tuned weights.
优化器将你的高层程序编译为优化后的提示词或微调权重。

BootstrapFewShot

BootstrapFewShot

Best For: Small datasets (10-50 examples), quick optimization Optimizes: Few-shot examples only
python
from dspy.teleprompt import BootstrapFewShot
最佳适用场景:小数据集(10-50个示例)、快速优化 优化对象:仅少样本示例
python
from dspy.teleprompt import BootstrapFewShot

Define metric function

定义指标函数

def accuracy_metric(example, prediction, trace=None): """Evaluate prediction correctness.""" return example.answer.lower() == prediction.answer.lower()
def accuracy_metric(example, prediction, trace=None): """评估预测的正确性。""" return example.answer.lower() == prediction.answer.lower()

Configure optimizer

配置优化器

optimizer = BootstrapFewShot( metric=accuracy_metric, max_bootstrapped_demos=4, # Max examples to bootstrap max_labeled_demos=16, # Max labeled examples to consider max_rounds=1, # Bootstrapping rounds max_errors=10 # Stop after N errors )
optimizer = BootstrapFewShot( metric=accuracy_metric, max_bootstrapped_demos=4, # 最大引导示例数 max_labeled_demos=16, # 考虑的最大标注示例数 max_rounds=1, # 引导轮次 max_errors=10 # 出现N次错误后停止 )

Training examples

训练示例

trainset = [ dspy.Example(question="What is 2+2?", answer="4").with_inputs("question"), dspy.Example(question="Capital of France?", answer="Paris").with_inputs("question"), # ... more examples ]
trainset = [ dspy.Example(question="2+2等于多少?", answer="4").with_inputs("question"), dspy.Example(question="法国的首都是?", answer="Paris").with_inputs("question"), # ... 更多示例 ]

Compile program

编译程序

qa_module = dspy.ChainOfThought("question -> answer") optimized_qa = optimizer.compile( student=qa_module, trainset=trainset )
qa_module = dspy.ChainOfThought("question -> answer") optimized_qa = optimizer.compile( student=qa_module, trainset=trainset )

Save optimized program

保存优化后的程序

optimized_qa.save("qa_optimized.json")

**How It Works**:
1. Uses your program to generate outputs on training data
2. Filters successful traces using your metric
3. Selects representative examples as demonstrations
4. Returns optimized program with best few-shot examples
optimized_qa.save("qa_optimized.json")

**工作原理**:
1. 使用你的程序在训练数据上生成输出
2. 利用你的指标过滤成功的轨迹
3. 选择代表性示例作为演示
4. 返回包含最佳少样本示例的优化程序

BootstrapFewShotWithRandomSearch

BootstrapFewShotWithRandomSearch

Best For: Medium datasets (50-300 examples), better exploration Optimizes: Few-shot examples with candidate exploration
python
from dspy.teleprompt import BootstrapFewShotWithRandomSearch

config = dict(
    max_bootstrapped_demos=4,
    max_labeled_demos=4,
    num_candidate_programs=10,   # Explore 10 candidate programs
    num_threads=4                # Parallel optimization
)

optimizer = BootstrapFewShotWithRandomSearch(
    metric=accuracy_metric,
    **config
)

optimized_program = optimizer.compile(
    qa_module,
    trainset=training_examples,
    valset=validation_examples  # Optional validation set
)
最佳适用场景:中等数据集(50-300个示例)、更好的探索性 优化对象:带候选探索的少样本示例
python
from dspy.teleprompt import BootstrapFewShotWithRandomSearch

config = dict(
    max_bootstrapped_demos=4,
    max_labeled_demos=4,
    num_candidate_programs=10,   # 探索10个候选程序
    num_threads=4                # 并行优化
)

optimizer = BootstrapFewShotWithRandomSearch(
    metric=accuracy_metric,
    **config
)

optimized_program = optimizer.compile(
    qa_module,
    trainset=training_examples,
    valset=validation_examples  # 可选验证集
)

Compare candidates

比较候选程序

print(f"Best program score: {optimizer.best_score}")

**Advantage**: Explores multiple candidate programs in parallel, selecting best performer via random search.
print(f"最佳程序得分: {optimizer.best_score}")

**优势**:并行探索多个候选程序,通过随机搜索选择最佳表现者。

MIPROv2 (State-of-the-Art 2025)

MIPROv2(2025年最先进方案)

Best For: Large datasets (300+ examples), production systems Optimizes: Instructions AND few-shot examples jointly via Bayesian optimization
python
import dspy
from dspy.teleprompt import MIPROv2
最佳适用场景:大数据集(300+示例)、生产级系统 优化对象:通过贝叶斯优化联合优化指令与少样本示例
python
import dspy
from dspy.teleprompt import MIPROv2

Initialize language model

初始化语言模型

lm = dspy.LM('openai/gpt-4o-mini', api_key='YOUR_API_KEY') dspy.configure(lm=lm)
lm = dspy.LM('openai/gpt-4o-mini', api_key='YOUR_API_KEY') dspy.configure(lm=lm)

Define comprehensive metric

定义综合指标

def quality_metric(example, prediction, trace=None): """Multi-dimensional quality scoring.""" correct = example.answer.lower() in prediction.answer.lower() reasonable_length = 10 < len(prediction.answer) < 200 has_reasoning = hasattr(prediction, 'rationale') and len(prediction.rationale) > 20
# Weighted composite score
score = (
    correct * 1.0 +
    reasonable_length * 0.2 +
    has_reasoning * 0.3
)
return score / 1.5  # Normalize to [0, 1]
def quality_metric(example, prediction, trace=None): """多维度质量评分。""" correct = example.answer.lower() in prediction.answer.lower() reasonable_length = 10 < len(prediction.answer) < 200 has_reasoning = hasattr(prediction, 'rationale') and len(prediction.rationale) > 20
# 加权综合得分
score = (
    correct * 1.0 +
    reasonable_length * 0.2 +
    has_reasoning * 0.3
)
return score / 1.5  # 归一化到[0, 1]

Initialize MIPROv2 with auto-configuration

使用自动配置初始化MIPROv2

teleprompter = MIPROv2( metric=quality_metric, auto="medium", # Options: "light", "medium", "heavy" num_candidates=10, # Number of instruction candidates to explore init_temperature=1.0 # Temperature for instruction generation )
teleprompter = MIPROv2( metric=quality_metric, auto="medium", # 选项: "light", "medium", "heavy" num_candidates=10, # 要探索的指令候选数 init_temperature=1.0 # 指令生成的温度参数 )

Optimize program

优化程序

optimized_program = teleprompter.compile( dspy.ChainOfThought("question -> answer"), trainset=training_examples, num_trials=100, # Bayesian optimization trials max_bootstrapped_demos=4, max_labeled_demos=8 )
optimized_program = teleprompter.compile( dspy.ChainOfThought("question -> answer"), trainset=training_examples, num_trials=100, # 贝叶斯优化试验次数 max_bootstrapped_demos=4, max_labeled_demos=8 )

Save for production

保存用于生产环境

optimized_program.save("production_qa_model.json")

**MIPROv2 Auto-Configuration Modes**:
- **`light`**: Fast optimization, ~20 trials, best for iteration (15-30 min)
- **`medium`**: Balanced optimization, ~50 trials, recommended default (30-60 min)
- **`heavy`**: Exhaustive search, ~100+ trials, highest quality (1-3 hours)

**How MIPROv2 Works**:
1. **Bootstrap Candidates**: Generates few-shot example candidates from training data
2. **Propose Instructions**: Creates instruction variations grounded in task dynamics
3. **Bayesian Optimization**: Uses surrogate model to find optimal instruction + example combinations
4. **Joint Optimization**: Optimizes both components together (not separately) for synergy

**Performance Gains** (2025 Study):
- Prompt Evaluation: +38.5% accuracy (46.2% → 64.0%)
- Guardrail Enforcement: +16.9% accuracy (72.1% → 84.3%)
- Code Generation: +21.9% accuracy (58.4% → 71.2%)
- Hallucination Detection: +20.8% accuracy (65.8% → 79.5%)
- Agent Routing: +18.5% accuracy (69.3% → 82.1%)
optimized_program.save("production_qa_model.json")

**MIPROv2自动配置模式**:
- **`light`**: 快速优化,约20次试验,适合迭代(15-30分钟)
- **`medium`**: 平衡优化,约50次试验,推荐默认配置(30-60分钟)
- **`heavy`**:  exhaustive搜索,约100+次试验,最高质量(1-3小时)

**MIPROv2工作原理**:
1. **引导候选示例**:从训练数据生成少样本示例候选
2. **生成指令候选**:基于任务动态创建指令变体
3. **贝叶斯优化**:使用代理模型找到最优指令+示例组合
4. **联合优化**:协同优化两个组件(而非单独优化)以实现增效

**性能提升**(2025年研究):
- 提示词评估:准确率提升38.5%(46.2% → 64.0%)
- 护栏执行:准确率提升16.9%(72.1% → 84.3%)
- 代码生成:准确率提升21.9%(58.4% → 71.2%)
- 幻觉检测:准确率提升20.8%(65.8% → 79.5%)
- Agent路由:准确率提升18.5%(69.3% → 82.1%)

KNN Few-Shot Selector

KNN少样本选择器

Best For: Dynamic example selection based on query similarity
python
from dspy.teleprompt import KNNFewShot
最佳适用场景:基于查询相似度的动态示例选择
python
from dspy.teleprompt import KNNFewShot

Requires embeddings for examples

需要为示例生成嵌入

knn_optimizer = KNNFewShot( k=3, # Select 3 most similar examples trainset=training_examples )
optimized_program = knn_optimizer.compile(qa_module)
knn_optimizer = KNNFewShot( k=3, # 选择3个最相似的示例 trainset=training_examples )
optimized_program = knn_optimizer.compile(qa_module)

Automatically selects relevant examples at inference time

推理时自动选择相关示例

Math query → retrieves math examples

数学查询 → 检索数学示例

Geography query → retrieves geography examples

地理查询 → 检索地理示例

undefined
undefined

SignatureOptimizer

签名优化器

Best For: Optimizing signature descriptions and field specifications
python
from dspy.teleprompt import SignatureOptimizer

sig_optimizer = SignatureOptimizer(
    metric=accuracy_metric,
    breadth=10,  # Number of variations to generate
    depth=3      # Optimization iterations
)

optimized_signature = sig_optimizer.compile(
    initial_signature=QuestionAnswer,
    trainset=trainset
)
最佳适用场景:优化签名描述与字段规范
python
from dspy.teleprompt import SignatureOptimizer

sig_optimizer = SignatureOptimizer(
    metric=accuracy_metric,
    breadth=10,  # 要生成的变体数量
    depth=3      # 优化迭代次数
)

optimized_signature = sig_optimizer.compile(
    initial_signature=QuestionAnswer,
    trainset=trainset
)

Use optimized signature

使用优化后的签名

qa = dspy.ChainOfThought(optimized_signature)
undefined
qa = dspy.ChainOfThought(optimized_signature)
undefined

Sequential Optimization Strategy

顺序优化策略

Combine optimizers for best results:
python
undefined
组合多个优化器以获得最佳结果:
python
undefined

Step 1: Bootstrap few-shot examples (fast)

步骤1:引导少样本示例(快速)

bootstrap = dspy.BootstrapFewShot(metric=accuracy_metric) bootstrapped_program = bootstrap.compile(qa_module, trainset=train_examples)
bootstrap = dspy.BootstrapFewShot(metric=accuracy_metric) bootstrapped_program = bootstrap.compile(qa_module, trainset=train_examples)

Step 2: Optimize instructions with MIPRO (comprehensive)

步骤2:使用MIPRO优化指令(全面)

mipro = dspy.MIPROv2(metric=accuracy_metric, auto="medium") final_program = mipro.compile( bootstrapped_program, trainset=train_examples, num_trials=50 )
mipro = dspy.MIPROv2(metric=accuracy_metric, auto="medium") final_program = mipro.compile( bootstrapped_program, trainset=train_examples, num_trials=50 )

Step 3: Fine-tune signature descriptions

步骤3:微调签名描述

sig_optimizer = dspy.SignatureOptimizer(metric=accuracy_metric) production_program = sig_optimizer.compile(final_program, trainset=train_examples)
sig_optimizer = dspy.SignatureOptimizer(metric=accuracy_metric) production_program = sig_optimizer.compile(final_program, trainset=train_examples)

Save production model

保存生产级模型

production_program.save("production_optimized.json")
undefined
production_program.save("production_optimized.json")
undefined

Teleprompters: Compilation Pipelines

Teleprompters:编译流水线

Teleprompters orchestrate the optimization process (legacy term for "optimizers").
Custom Teleprompter:
python
class CustomTeleprompter:
    """Custom optimization pipeline."""

    def __init__(self, metric):
        self.metric = metric

    def compile(self, student, trainset, valset=None):
        # Stage 1: Bootstrap examples
        bootstrap = BootstrapFewShot(metric=self.metric)
        stage1 = bootstrap.compile(student, trainset=trainset)

        # Stage 2: Optimize instructions
        mipro = MIPROv2(metric=self.metric, auto="light")
        stage2 = mipro.compile(stage1, trainset=trainset)

        # Stage 3: Validate on held-out set
        if valset:
            score = self._evaluate(stage2, valset)
            print(f"Validation score: {score:.2%}")

        return stage2

    def _evaluate(self, program, dataset):
        correct = 0
        for example in dataset:
            prediction = program(**example.inputs())
            if self.metric(example, prediction):
                correct += 1
        return correct / len(dataset)
Teleprompters(提示词生成器)协调优化过程(“优化器”的旧称)。
自定义Teleprompter:
python
class CustomTeleprompter:
    """自定义优化流水线。"""

    def __init__(self, metric):
        self.metric = metric

    def compile(self, student, trainset, valset=None):
        # 阶段1:引导示例
        bootstrap = BootstrapFewShot(metric=self.metric)
        stage1 = bootstrap.compile(student, trainset=trainset)

        # 阶段2:优化指令
        mipro = MIPROv2(metric=self.metric, auto="light")
        stage2 = mipro.compile(stage1, trainset=trainset)

        # 阶段3:在预留集上验证
        if valset:
            score = self._evaluate(stage2, valset)
            print(f"验证得分: {score:.2%}")

        return stage2

    def _evaluate(self, program, dataset):
        correct = 0
        for example in dataset:
            prediction = program(**example.inputs())
            if self.metric(example, prediction):
                correct += 1
        return correct / len(dataset)

Use custom teleprompter

使用自定义Teleprompter

custom_optimizer = CustomTeleprompter(metric=accuracy_metric) optimized = custom_optimizer.compile( student=qa_module, trainset=train_examples, valset=val_examples )
undefined
custom_optimizer = CustomTeleprompter(metric=accuracy_metric) optimized = custom_optimizer.compile( student=qa_module, trainset=train_examples, valset=val_examples )
undefined

Metrics and Evaluation

指标与评估

Custom Metrics

自定义指标

Binary Accuracy:
python
def exact_match(example, prediction, trace=None):
    """Exact match metric."""
    return example.answer.lower().strip() == prediction.answer.lower().strip()
Fuzzy Matching:
python
from difflib import SequenceMatcher

def fuzzy_match(example, prediction, trace=None):
    """Fuzzy string matching."""
    similarity = SequenceMatcher(
        None,
        example.answer.lower(),
        prediction.answer.lower()
    ).ratio()
    return similarity > 0.8  # 80% similarity threshold
Multi-Criteria:
python
def comprehensive_metric(example, prediction, trace=None):
    """Evaluate on multiple criteria."""
    # Correctness
    correct = example.answer.lower() in prediction.answer.lower()

    # Length appropriateness
    length_ok = 10 < len(prediction.answer) < 200

    # Has reasoning (if CoT)
    has_reasoning = (
        hasattr(prediction, 'rationale') and
        len(prediction.rationale) > 30
    )

    # Citation quality (if RAG)
    has_citations = (
        hasattr(prediction, 'context') and
        len(prediction.context) > 0
    )

    # Composite score
    score = sum([
        correct * 1.0,
        length_ok * 0.2,
        has_reasoning * 0.3,
        has_citations * 0.2
    ]) / 1.7

    return score
LLM-as-Judge:
python
def llm_judge_metric(example, prediction, trace=None):
    """Use LLM to evaluate quality."""
    judge_prompt = f"""
    Question: {example.question}
    Expected Answer: {example.answer}
    Predicted Answer: {prediction.answer}

    Evaluate the predicted answer on a scale of 0-10 for:
    1. Correctness
    2. Completeness
    3. Clarity

    Return only a number 0-10.
    """

    judge_lm = dspy.LM('openai/gpt-4o-mini')
    response = judge_lm(judge_prompt)
    score = float(response.strip()) / 10.0

    return score > 0.7  # Pass if score > 7/10
二元准确率:
python
def exact_match(example, prediction, trace=None):
    """精确匹配指标。"""
    return example.answer.lower().strip() == prediction.answer.lower().strip()
模糊匹配:
python
from difflib import SequenceMatcher

def fuzzy_match(example, prediction, trace=None):
    """模糊字符串匹配。"""
    similarity = SequenceMatcher(
        None,
        example.answer.lower(),
        prediction.answer.lower()
    ).ratio()
    return similarity > 0.8  # 80%相似度阈值
多标准指标:
python
def comprehensive_metric(example, prediction, trace=None):
    """基于多标准评估。"""
    # 正确性
    correct = example.answer.lower() in prediction.answer.lower()

    # 长度合理性
    length_ok = 10 < len(prediction.answer) < 200

    # 是否包含推理过程(如果是CoT)
    has_reasoning = (
        hasattr(prediction, 'rationale') and
        len(prediction.rationale) > 30
    )

    # 引用质量(如果是RAG)
    has_citations = (
        hasattr(prediction, 'context') and
        len(prediction.context) > 0
    )

    # 综合得分
    score = sum([
        correct * 1.0,
        length_ok * 0.2,
        has_reasoning * 0.3,
        has_citations * 0.2
    ]) / 1.7

    return score
LLM作为评判者:
python
def llm_judge_metric(example, prediction, trace=None):
    """使用LLM评估质量。"""
    judge_prompt = f"""
    问题: {example.question}
    预期答案: {example.answer}
    预测答案: {prediction.answer}

    从以下维度对预测答案进行0-10分的评分:
    1. 正确性
    2. 完整性
    3. 清晰度

    仅返回0-10之间的数字。
    """

    judge_lm = dspy.LM('openai/gpt-4o-mini')
    response = judge_lm(judge_prompt)
    score = float(response.strip()) / 10.0

    return score > 0.7  # 得分>7/10则通过

Evaluation Pipeline

评估流水线

python
class Evaluator:
    """Comprehensive evaluation system."""

    def __init__(self, program, metrics):
        self.program = program
        self.metrics = metrics

    def evaluate(self, dataset, verbose=True):
        """Evaluate program on dataset."""
        results = {name: [] for name in self.metrics.keys()}

        for example in dataset:
            prediction = self.program(**example.inputs())

            for metric_name, metric_fn in self.metrics.items():
                score = metric_fn(example, prediction)
                results[metric_name].append(score)

        # Aggregate results
        aggregated = {
            name: sum(scores) / len(scores)
            for name, scores in results.items()
        }

        if verbose:
            print("\nEvaluation Results:")
            print("=" * 50)
            for name, score in aggregated.items():
                print(f"{name:20s}: {score:.2%}")

        return aggregated
python
class Evaluator:
    """综合评估系统。"""

    def __init__(self, program, metrics):
        self.program = program
        self.metrics = metrics

    def evaluate(self, dataset, verbose=True):
        """在数据集上评估程序。"""
        results = {name: [] for name in self.metrics.keys()}

        for example in dataset:
            prediction = self.program(**example.inputs())

            for metric_name, metric_fn in self.metrics.items():
                score = metric_fn(example, prediction)
                results[metric_name].append(score)

        # 聚合结果
        aggregated = {
            name: sum(scores) / len(scores)
            for name, scores in results.items()
        }

        if verbose:
            print("\n评估结果:")
            print("=" * 50)
            for name, score in aggregated.items():
                print(f"{name:20s}: {score:.2%}")

        return aggregated

Use evaluator

使用评估器

evaluator = Evaluator( program=optimized_qa, metrics={ "accuracy": exact_match, "fuzzy_match": fuzzy_match, "quality": comprehensive_metric } )
scores = evaluator.evaluate(test_dataset)
undefined
evaluator = Evaluator( program=optimized_qa, metrics={ "accuracy": exact_match, "fuzzy_match": fuzzy_match, "quality": comprehensive_metric } )
scores = evaluator.evaluate(test_dataset)
undefined

Language Model Configuration

语言模型配置

Supported Providers

支持的提供商

OpenAI:
python
import dspy

lm = dspy.LM('openai/gpt-4o', api_key='YOUR_API_KEY')
dspy.configure(lm=lm)
OpenAI:
python
import dspy

lm = dspy.LM('openai/gpt-4o', api_key='YOUR_API_KEY')
dspy.configure(lm=lm)

With custom settings

自定义设置

lm = dspy.LM( 'openai/gpt-4o-mini', api_key='YOUR_API_KEY', temperature=0.7, max_tokens=1024 )

**Anthropic Claude**:
```python
lm = dspy.LM(
    'anthropic/claude-3-5-sonnet-20241022',
    api_key='YOUR_ANTHROPIC_KEY',
    max_tokens=4096
)
dspy.configure(lm=lm)
lm = dspy.LM( 'openai/gpt-4o-mini', api_key='YOUR_API_KEY', temperature=0.7, max_tokens=1024 )

**Anthropic Claude**:
```python
lm = dspy.LM(
    'anthropic/claude-3-5-sonnet-20241022',
    api_key='YOUR_ANTHROPIC_KEY',
    max_tokens=4096
)
dspy.configure(lm=lm)

Claude Opus for complex reasoning

用于复杂推理的Claude Opus

lm_opus = dspy.LM('anthropic/claude-3-opus-20240229', api_key=key)

**Local Models (Ollama)**:
```python
lm_opus = dspy.LM('anthropic/claude-3-opus-20240229', api_key=key)

**本地模型(Ollama)**:
```python

Requires Ollama running locally

需要本地运行Ollama

lm = dspy.LM('ollama/llama3.1:70b', api_base='http://localhost:11434') dspy.configure(lm=lm)
lm = dspy.LM('ollama/llama3.1:70b', api_base='http://localhost:11434') dspy.configure(lm=lm)

Mixtral

Mixtral

lm = dspy.LM('ollama/mixtral:8x7b')

**Multiple Models**:
```python
lm = dspy.LM('ollama/mixtral:8x7b')

**多模型配置**:
```python

Use different models for different stages

为不同阶段使用不同模型

strong_lm = dspy.LM('openai/gpt-4o') fast_lm = dspy.LM('openai/gpt-4o-mini')
strong_lm = dspy.LM('openai/gpt-4o') fast_lm = dspy.LM('openai/gpt-4o-mini')

Configure per module

为每个模块单独配置

class HybridPipeline(dspy.Module): def init(self): super().init() # Fast model for retrieval self.retrieve = dspy.Retrieve(k=5)
    # Strong model for reasoning
    with dspy.context(lm=strong_lm):
        self.reason = dspy.ChainOfThought("context, question -> answer")

def forward(self, question):
    context = self.retrieve(question).passages
    return self.reason(context=context, question=question)
undefined
class HybridPipeline(dspy.Module): def init(self): super().init() # 快速模型用于检索 self.retrieve = dspy.Retrieve(k=5)
    # 高性能模型用于推理
    with dspy.context(lm=strong_lm):
        self.reason = dspy.ChainOfThought("context, question -> answer")

def forward(self, question):
    context = self.retrieve(question).passages
    return self.reason(context=context, question=question)
undefined

Model Selection Strategy

模型选择策略

python
def select_model(task_complexity, budget):
    """Select appropriate model based on task and budget."""
    models = {
        "simple": [
            ("openai/gpt-4o-mini", 0.15),  # (model, cost per 1M tokens)
            ("anthropic/claude-3-haiku-20240307", 0.25),
        ],
        "medium": [
            ("openai/gpt-4o", 2.50),
            ("anthropic/claude-3-5-sonnet-20241022", 3.00),
        ],
        "complex": [
            ("anthropic/claude-3-opus-20240229", 15.00),
            ("openai/o1-preview", 15.00),
        ]
    }

    candidates = models[task_complexity]
    affordable = [m for m, cost in candidates if cost <= budget]

    return affordable[0] if affordable else candidates[0][0]
python
def select_model(task_complexity, budget):
    """根据任务和预算选择合适的模型。"""
    models = {
        "simple": [
            ("openai/gpt-4o-mini", 0.15),  # (模型, 每1M令牌成本)
            ("anthropic/claude-3-haiku-20240307", 0.25),
        ],
        "medium": [
            ("openai/gpt-4o", 2.50),
            ("anthropic/claude-3-5-sonnet-20241022", 3.00),
        ],
        "complex": [
            ("anthropic/claude-3-opus-20240229", 15.00),
            ("openai/o1-preview", 15.00),
        ]
    }

    candidates = models[task_complexity]
    affordable = [m for m, cost in candidates if cost <= budget]

    return affordable[0] if affordable else candidates[0][0]

Use in optimization

在优化中使用

task = "complex" model = select_model(task, budget=10.0) lm = dspy.LM(model) dspy.configure(lm=lm)
undefined
task = "complex" model = select_model(task, budget=10.0) lm = dspy.LM(model) dspy.configure(lm=lm)
undefined

Program Composition

程序组合

Chaining Modules

模块链式调用

python
class MultiStepPipeline(dspy.Module):
    """Chain multiple reasoning steps."""

    def __init__(self):
        super().__init__()
        self.step1 = dspy.ChainOfThought("question -> subtasks")
        self.step2 = dspy.ChainOfThought("subtask -> solution")
        self.step3 = dspy.ChainOfThought("solutions -> final_answer")

    def forward(self, question):
        # Break down question
        decomposition = self.step1(question=question)

        # Solve each subtask
        solutions = []
        for subtask in decomposition.subtasks.split('\n'):
            if subtask.strip():
                sol = self.step2(subtask=subtask)
                solutions.append(sol.solution)

        # Synthesize final answer
        combined = '\n'.join(solutions)
        final = self.step3(solutions=combined)

        return dspy.Prediction(
            answer=final.final_answer,
            subtasks=decomposition.subtasks,
            solutions=solutions
        )
python
class MultiStepPipeline(dspy.Module):
    """链式调用多个推理步骤。"""

    def __init__(self):
        super().__init__()
        self.step1 = dspy.ChainOfThought("question -> subtasks")
        self.step2 = dspy.ChainOfThought("subtask -> solution")
        self.step3 = dspy.ChainOfThought("solutions -> final_answer")

    def forward(self, question):
        # 拆解问题
        decomposition = self.step1(question=question)

        # 解决每个子任务
        solutions = []
        for subtask in decomposition.subtasks.split('\n'):
            if subtask.strip():
                sol = self.step2(subtask=subtask)
                solutions.append(sol.solution)

        # 合成最终答案
        combined = '\n'.join(solutions)
        final = self.step3(solutions=combined)

        return dspy.Prediction(
            answer=final.final_answer,
            subtasks=decomposition.subtasks,
            solutions=solutions
        )

Optimize entire pipeline

优化整个流水线

pipeline = MultiStepPipeline() optimizer = MIPROv2(metric=quality_metric, auto="medium") optimized_pipeline = optimizer.compile(pipeline, trainset=examples)
undefined
pipeline = MultiStepPipeline() optimizer = MIPROv2(metric=quality_metric, auto="medium") optimized_pipeline = optimizer.compile(pipeline, trainset=examples)
undefined

Conditional Branching

条件分支

python
class AdaptivePipeline(dspy.Module):
    """Adapt reasoning based on query type."""

    def __init__(self):
        super().__init__()
        self.classifier = dspy.ChainOfThought("question -> category")
        self.math_solver = dspy.ProgramOfThought("problem -> solution")
        self.fact_qa = dspy.ChainOfThought("question -> answer")
        self.creative = dspy.ChainOfThought("prompt -> response")

    def forward(self, question):
        # Classify query type
        category = self.classifier(question=question).category.lower()

        # Route to appropriate module
        if "math" in category or "calculation" in category:
            return self.math_solver(problem=question)
        elif "creative" in category or "story" in category:
            return self.creative(prompt=question)
        else:
            return self.fact_qa(question=question)
python
class AdaptivePipeline(dspy.Module):
    """根据查询类型调整推理方式。"""

    def __init__(self):
        super().__init__()
        self.classifier = dspy.ChainOfThought("question -> category")
        self.math_solver = dspy.ProgramOfThought("problem -> solution")
        self.fact_qa = dspy.ChainOfThought("question -> answer")
        self.creative = dspy.ChainOfThought("prompt -> response")

    def forward(self, question):
        # 分类查询类型
        category = self.classifier(question=question).category.lower()

        # 路由到合适的模块
        if "math" in category or "calculation" in category:
            return self.math_solver(problem=question)
        elif "creative" in category or "story" in category:
            return self.creative(prompt=question)
        else:
            return self.fact_qa(question=question)

Optimize each branch independently

独立优化每个分支

adaptive = AdaptivePipeline() optimized_adaptive = optimizer.compile(adaptive, trainset=diverse_examples)
undefined
adaptive = AdaptivePipeline() optimized_adaptive = optimizer.compile(adaptive, trainset=diverse_examples)
undefined

Production Deployment

生产环境部署

Saving and Loading Models

模型的保存与加载

python
undefined
python
undefined

Save optimized program

保存优化后的程序

optimized_program.save("models/qa_v1.0.0.json")
optimized_program.save("models/qa_v1.0.0.json")

Load in production

在生产环境中加载

production_qa = dspy.ChainOfThought("question -> answer") production_qa.load("models/qa_v1.0.0.json")
production_qa = dspy.ChainOfThought("question -> answer") production_qa.load("models/qa_v1.0.0.json")

Use loaded model

使用加载后的模型

response = production_qa(question="What is quantum computing?")
undefined
response = production_qa(question="什么是量子计算?")
undefined

Version Control

版本控制

python
import json
from datetime import datetime

class ModelRegistry:
    """Version control for DSPy models."""

    def __init__(self, registry_path="models/registry.json"):
        self.registry_path = registry_path
        self.registry = self._load_registry()

    def register(self, name, version, model_path, metadata=None):
        """Register a model version."""
        model_id = f"{name}:v{version}"

        self.registry[model_id] = {
            "name": name,
            "version": version,
            "path": model_path,
            "created_at": datetime.utcnow().isoformat(),
            "metadata": metadata or {}
        }

        self._save_registry()
        return model_id

    def get_model(self, name, version="latest"):
        """Load model by name and version."""
        if version == "latest":
            versions = [
                v for k, v in self.registry.items()
                if v["name"] == name
            ]
            if not versions:
                raise ValueError(f"No versions found for {name}")

            latest = max(versions, key=lambda x: x["created_at"])
            model_path = latest["path"]
        else:
            model_id = f"{name}:v{version}"
            model_path = self.registry[model_id]["path"]

        # Load model
        module = dspy.ChainOfThought("question -> answer")
        module.load(model_path)
        return module

    def _load_registry(self):
        try:
            with open(self.registry_path, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {}

    def _save_registry(self):
        with open(self.registry_path, 'w') as f:
            json.dump(self.registry, f, indent=2)
python
import json
from datetime import datetime

class ModelRegistry:
    """DSPy模型的版本控制。"""

    def __init__(self, registry_path="models/registry.json"):
        self.registry_path = registry_path
        self.registry = self._load_registry()

    def register(self, name, version, model_path, metadata=None):
        """注册模型版本。"""
        model_id = f"{name}:v{version}"

        self.registry[model_id] = {
            "name": name,
            "version": version,
            "path": model_path,
            "created_at": datetime.utcnow().isoformat(),
            "metadata": metadata or {}
        }

        self._save_registry()
        return model_id

    def get_model(self, name, version="latest"):
        """按名称和版本加载模型。"""
        if version == "latest":
            versions = [
                v for k, v in self.registry.items()
                if v["name"] == name
            ]
            if not versions:
                raise ValueError(f"未找到{name}的任何版本")

            latest = max(versions, key=lambda x: x["created_at"])
            model_path = latest["path"]
        else:
            model_id = f"{name}:v{version}"
            model_path = self.registry[model_id]["path"]

        # 加载模型
        module = dspy.ChainOfThought("question -> answer")
        module.load(model_path)
        return module

    def _load_registry(self):
        try:
            with open(self.registry_path, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {}

    def _save_registry(self):
        with open(self.registry_path, 'w') as f:
            json.dump(self.registry, f, indent=2)

Use registry

使用注册表

registry = ModelRegistry()
registry = ModelRegistry()

Register new version

注册新版本

registry.register( name="qa_assistant", version="1.0.0", model_path="models/qa_v1.0.0.json", metadata={ "accuracy": 0.87, "optimizer": "MIPROv2", "training_examples": 500 } )
registry.register( name="qa_assistant", version="1.0.0", model_path="models/qa_v1.0.0.json", metadata={ "accuracy": 0.87, "optimizer": "MIPROv2", "training_examples": 500 } )

Load for production

加载生产级模型

qa = registry.get_model("qa_assistant", version="latest")
undefined
qa = registry.get_model("qa_assistant", version="latest")
undefined

Monitoring and Logging

监控与日志

python
import logging
from datetime import datetime

class DSPyMonitor:
    """Monitor DSPy program execution."""

    def __init__(self, program, log_file="logs/dspy.log"):
        self.program = program
        self.logger = self._setup_logger(log_file)
        self.metrics = []

    def __call__(self, **kwargs):
        """Wrap program execution with monitoring."""
        start_time = datetime.utcnow()

        try:
            # Execute program
            result = self.program(**kwargs)

            # Log success
            duration = (datetime.utcnow() - start_time).total_seconds()
            self._log_execution(
                status="success",
                inputs=kwargs,
                outputs=result,
                duration=duration
            )

            return result

        except Exception as e:
            # Log error
            duration = (datetime.utcnow() - start_time).total_seconds()
            self._log_execution(
                status="error",
                inputs=kwargs,
                error=str(e),
                duration=duration
            )
            raise

    def _log_execution(self, status, inputs, duration, outputs=None, error=None):
        """Log execution details."""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "status": status,
            "inputs": inputs,
            "duration_seconds": duration
        }

        if outputs:
            log_entry["outputs"] = str(outputs)
        if error:
            log_entry["error"] = error

        self.logger.info(json.dumps(log_entry))
        self.metrics.append(log_entry)

    def _setup_logger(self, log_file):
        """Setup logging."""
        logger = logging.getLogger("dspy_monitor")
        logger.setLevel(logging.INFO)

        handler = logging.FileHandler(log_file)
        handler.setFormatter(
            logging.Formatter('%(asctime)s - %(message)s')
        )
        logger.addHandler(handler)

        return logger

    def get_stats(self):
        """Get execution statistics."""
        if not self.metrics:
            return {}

        successes = [m for m in self.metrics if m["status"] == "success"]
        errors = [m for m in self.metrics if m["status"] == "error"]

        return {
            "total_calls": len(self.metrics),
            "success_rate": len(successes) / len(self.metrics),
            "error_rate": len(errors) / len(self.metrics),
            "avg_duration": sum(m["duration_seconds"] for m in self.metrics) / len(self.metrics),
            "errors": [m["error"] for m in errors]
        }
python
import logging
from datetime import datetime

class DSPyMonitor:
    """监控DSPy程序执行。"""

    def __init__(self, program, log_file="logs/dspy.log"):
        self.program = program
        self.logger = self._setup_logger(log_file)
        self.metrics = []

    def __call__(self, **kwargs):
        """用监控包装程序执行。"""
        start_time = datetime.utcnow()

        try:
            # 执行程序
            result = self.program(**kwargs)

            # 记录成功
            duration = (datetime.utcnow() - start_time).total_seconds()
            self._log_execution(
                status="success",
                inputs=kwargs,
                outputs=result,
                duration=duration
            )

            return result

        except Exception as e:
            # 记录错误
            duration = (datetime.utcnow() - start_time).total_seconds()
            self._log_execution(
                status="error",
                inputs=kwargs,
                error=str(e),
                duration=duration
            )
            raise

    def _log_execution(self, status, inputs, duration, outputs=None, error=None):
        """记录执行细节。"""
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "status": status,
            "inputs": inputs,
            "duration_seconds": duration
        }

        if outputs:
            log_entry["outputs"] = str(outputs)
        if error:
            log_entry["error"] = error

        self.logger.info(json.dumps(log_entry))
        self.metrics.append(log_entry)

    def _setup_logger(self, log_file):
        """配置日志。"""
        logger = logging.getLogger("dspy_monitor")
        logger.setLevel(logging.INFO)

        handler = logging.FileHandler(log_file)
        handler.setFormatter(
            logging.Formatter('%(asctime)s - %(message)s')
        )
        logger.addHandler(handler)

        return logger

    def get_stats(self):
        """获取执行统计数据。"""
        if not self.metrics:
            return {}

        successes = [m for m in self.metrics if m["status"] == "success"]
        errors = [m for m in self.metrics if m["status"] == "error"]

        return {
            "total_calls": len(self.metrics),
            "success_rate": len(successes) / len(self.metrics),
            "error_rate": len(errors) / len(self.metrics),
            "avg_duration": sum(m["duration_seconds"] for m in self.metrics) / len(self.metrics),
            "errors": [m["error"] for m in errors]
        }

Use monitor

使用监控器

monitored_qa = DSPyMonitor(optimized_qa) result = monitored_qa(question="What is AI?")
monitored_qa = DSPyMonitor(optimized_qa) result = monitored_qa(question="什么是AI?")

Check stats

查看统计数据

stats = monitored_qa.get_stats() print(f"Success rate: {stats['success_rate']:.2%}")
undefined
stats = monitored_qa.get_stats() print(f"成功率: {stats['success_rate']:.2%}")
undefined

Integration with LangSmith

与LangSmith集成

Evaluate DSPy programs using LangSmith:
python
import os
from langsmith import Client
from langsmith.evaluation import evaluate
使用LangSmith评估DSPy程序:
python
import os
from langsmith import Client
from langsmith.evaluation import evaluate

Setup

配置

os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-key"
client = Client()
os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_API_KEY"] = "your-key"
client = Client()

Wrap DSPy program for LangSmith

为LangSmith包装DSPy程序

def dspy_wrapper(inputs: dict) -> dict: """Wrapper for LangSmith evaluation.""" question = inputs["question"] result = optimized_qa(question=question) return {"answer": result.answer}
def dspy_wrapper(inputs: dict) -> dict: """LangSmith评估的包装器。""" question = inputs["question"] result = optimized_qa(question=question) return {"answer": result.answer}

Define evaluator

定义评估器

def dspy_evaluator(run, example): """Evaluate DSPy output.""" predicted = run.outputs["answer"] expected = example.outputs["answer"]
return {
    "key": "correctness",
    "score": 1.0 if expected.lower() in predicted.lower() else 0.0
}
def dspy_evaluator(run, example): """评估DSPy输出。""" predicted = run.outputs["answer"] expected = example.outputs["answer"]
return {
    "key": "correctness",
    "score": 1.0 if expected.lower() in predicted.lower() else 0.0
}

Create dataset

创建数据集

dataset = client.create_dataset( dataset_name="dspy_qa_eval", description="DSPy QA evaluation dataset" )
dataset = client.create_dataset( dataset_name="dspy_qa_eval", description="DSPy QA评估数据集" )

Add examples

添加示例

for example in test_examples: client.create_example( dataset_id=dataset.id, inputs={"question": example.question}, outputs={"answer": example.answer} )
for example in test_examples: client.create_example( dataset_id=dataset.id, inputs={"question": example.question}, outputs={"answer": example.answer} )

Run evaluation

运行评估

results = evaluate( dspy_wrapper, data="dspy_qa_eval", evaluators=[dspy_evaluator], experiment_prefix="dspy_v1.0" )
print(f"Average correctness: {results['results']['correctness']:.2%}")
undefined
results = evaluate( dspy_wrapper, data="dspy_qa_eval", evaluators=[dspy_evaluator], experiment_prefix="dspy_v1.0" )
print(f"平均正确率: {results['results']['correctness']:.2%}")
undefined

Real-World Examples

实际应用示例

RAG Pipeline

RAG流水线

python
class ProductionRAG(dspy.Module):
    """Production-ready RAG system."""

    def __init__(self, k=5):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=k)

        # Multi-stage reasoning
        self.rerank = dspy.ChainOfThought(
            "question, passages -> relevant_passages"
        )
        self.generate = dspy.ChainOfThought(
            "question, context -> answer, citations"
        )

    def forward(self, question):
        # Retrieve candidate passages
        candidates = self.retrieve(question).passages

        # Rerank for relevance
        reranked = self.rerank(
            question=question,
            passages="\n---\n".join(candidates)
        )

        # Generate answer with citations
        result = self.generate(
            question=question,
            context=reranked.relevant_passages
        )

        return dspy.Prediction(
            answer=result.answer,
            citations=result.citations,
            passages=candidates
        )
python
class ProductionRAG(dspy.Module):
    """生产级RAG系统。"""

    def __init__(self, k=5):
        super().__init__()
        self.retrieve = dspy.Retrieve(k=k)

        # 多阶段推理
        self.rerank = dspy.ChainOfThought(
            "question, passages -> relevant_passages"
        )
        self.generate = dspy.ChainOfThought(
            "question, context -> answer, citations"
        )

    def forward(self, question):
        # 检索候选段落
        candidates = self.retrieve(question).passages

        # 重新排序以提升相关性
        reranked = self.rerank(
            question=question,
            passages="\n---\n".join(candidates)
        )

        # 生成带引用的答案
        result = self.generate(
            question=question,
            context=reranked.relevant_passages
        )

        return dspy.Prediction(
            answer=result.answer,
            citations=result.citations,
            passages=candidates
        )

Optimize RAG pipeline

优化RAG流水线

rag = ProductionRAG(k=10)
def rag_metric(example, prediction, trace=None): """Evaluate RAG quality.""" answer_correct = example.answer.lower() in prediction.answer.lower() has_citations = len(prediction.citations) > 0 return answer_correct and has_citations
optimizer = MIPROv2(metric=rag_metric, auto="heavy") optimized_rag = optimizer.compile(rag, trainset=rag_examples) optimized_rag.save("models/rag_production.json")
undefined
rag = ProductionRAG(k=10)
def rag_metric(example, prediction, trace=None): """评估RAG质量。""" answer_correct = example.answer.lower() in prediction.answer.lower() has_citations = len(prediction.citations) > 0 return answer_correct and has_citations
optimizer = MIPROv2(metric=rag_metric, auto="heavy") optimized_rag = optimizer.compile(rag, trainset=rag_examples) optimized_rag.save("models/rag_production.json")
undefined

Classification

分类任务

python
class SentimentClassifier(dspy.Module):
    """Multi-class sentiment classification."""

    def __init__(self, classes):
        super().__init__()
        self.classes = classes

        class ClassificationSig(dspy.Signature):
            text = dspy.InputField()
            reasoning = dspy.OutputField(desc="step-by-step reasoning")
            sentiment = dspy.OutputField(desc=f"one of: {', '.join(classes)}")
            confidence = dspy.OutputField(desc="confidence score 0-1")

        self.classify = dspy.ChainOfThought(ClassificationSig)

    def forward(self, text):
        result = self.classify(text=text)

        # Validate output
        if result.sentiment not in self.classes:
            result.sentiment = "neutral"  # Fallback

        return result
python
class SentimentClassifier(dspy.Module):
    """多分类情感分类器。"""

    def __init__(self, classes):
        super().__init__()
        self.classes = classes

        class ClassificationSig(dspy.Signature):
            text = dspy.InputField()
            reasoning = dspy.OutputField(desc="逐步推理过程")
            sentiment = dspy.OutputField(desc=f"可选值: {', '.join(classes)}")
            confidence = dspy.OutputField(desc="置信度分数0-1")

        self.classify = dspy.ChainOfThought(ClassificationSig)

    def forward(self, text):
        result = self.classify(text=text)

        # 验证输出
        if result.sentiment not in self.classes:
            result.sentiment = "neutral"  #  fallback

        return result

Train classifier

训练分类器

classes = ["positive", "negative", "neutral"] classifier = SentimentClassifier(classes)
def classification_metric(example, prediction, trace=None): return example.sentiment == prediction.sentiment
optimizer = BootstrapFewShot(metric=classification_metric) optimized_classifier = optimizer.compile( classifier, trainset=sentiment_examples )
classes = ["positive", "negative", "neutral"] classifier = SentimentClassifier(classes)
def classification_metric(example, prediction, trace=None): return example.sentiment == prediction.sentiment
optimizer = BootstrapFewShot(metric=classification_metric) optimized_classifier = optimizer.compile( classifier, trainset=sentiment_examples )

Use in production

在生产环境中使用

result = optimized_classifier(text="This product is amazing!") print(f"Sentiment: {result.sentiment} ({result.confidence})")
undefined
result = optimized_classifier(text="这个产品太棒了!") print(f"情感: {result.sentiment} ({result.confidence})")
undefined

Summarization

摘要任务

python
class DocumentSummarizer(dspy.Module):
    """Hierarchical document summarization."""

    def __init__(self):
        super().__init__()

        # Chunk-level summaries
        self.chunk_summary = dspy.ChainOfThought(
            "chunk -> summary"
        )

        # Document-level synthesis
        self.final_summary = dspy.ChainOfThought(
            "chunk_summaries -> final_summary, key_points"
        )

    def forward(self, document, chunk_size=1000):
        # Split document into chunks
        chunks = self._chunk_document(document, chunk_size)

        # Summarize each chunk
        chunk_summaries = []
        for chunk in chunks:
            summary = self.chunk_summary(chunk=chunk)
            chunk_summaries.append(summary.summary)

        # Synthesize final summary
        combined = "\n---\n".join(chunk_summaries)
        final = self.final_summary(chunk_summaries=combined)

        return dspy.Prediction(
            summary=final.final_summary,
            key_points=final.key_points.split('\n'),
            chunk_count=len(chunks)
        )

    def _chunk_document(self, document, chunk_size):
        """Split document into chunks."""
        words = document.split()
        chunks = []

        for i in range(0, len(words), chunk_size):
            chunk = ' '.join(words[i:i + chunk_size])
            chunks.append(chunk)

        return chunks
python
class DocumentSummarizer(dspy.Module):
    """分层文档摘要器。"""

    def __init__(self):
        super().__init__()

        # 段落级摘要
        self.chunk_summary = dspy.ChainOfThought(
            "chunk -> summary"
        )

        # 文档级合成
        self.final_summary = dspy.ChainOfThought(
            "chunk_summaries -> final_summary, key_points"
        )

    def forward(self, document, chunk_size=1000):
        # 将文档拆分为段落
        chunks = self._chunk_document(document, chunk_size)

        # 为每个段落生成摘要
        chunk_summaries = []
        for chunk in chunks:
            summary = self.chunk_summary(chunk=chunk)
            chunk_summaries.append(summary.summary)

        # 合成最终摘要
        combined = "\n---\n".join(chunk_summaries)
        final = self.final_summary(chunk_summaries=combined)

        return dspy.Prediction(
            summary=final.final_summary,
            key_points=final.key_points.split('\n'),
            chunk_count=len(chunks)
        )

    def _chunk_document(self, document, chunk_size):
        """将文档拆分为段落。"""
        words = document.split()
        chunks = []

        for i in range(0, len(words), chunk_size):
            chunk = ' '.join(words[i:i + chunk_size])
            chunks.append(chunk)

        return chunks

Optimize summarizer

优化摘要器

summarizer = DocumentSummarizer()
def summary_metric(example, prediction, trace=None): # Check key points coverage key_points_present = sum( 1 for kp in example.key_points if kp.lower() in prediction.summary.lower() ) coverage = key_points_present / len(example.key_points)
# Check length appropriateness
length_ok = 100 < len(prediction.summary) < 500

return coverage > 0.7 and length_ok
optimizer = MIPROv2(metric=summary_metric, auto="medium") optimized_summarizer = optimizer.compile(summarizer, trainset=summary_examples)
undefined
summarizer = DocumentSummarizer()
def summary_metric(example, prediction, trace=None): # 检查要点覆盖率 key_points_present = sum( 1 for kp in example.key_points if kp.lower() in prediction.summary.lower() ) coverage = key_points_present / len(example.key_points)
# 检查长度合理性
length_ok = 100 < len(prediction.summary) < 500

return coverage > 0.7 and length_ok
optimizer = MIPROv2(metric=summary_metric, auto="medium") optimized_summarizer = optimizer.compile(summarizer, trainset=summary_examples)
undefined

Question Answering

多跳问答

python
class MultiHopQA(dspy.Module):
    """Multi-hop question answering."""

    def __init__(self):
        super().__init__()

        # Decompose complex questions
        self.decompose = dspy.ChainOfThought(
            "question -> subquestions"
        )

        # Answer subquestions with retrieval
        self.retrieve = dspy.Retrieve(k=3)
        self.answer_subq = dspy.ChainOfThought(
            "subquestion, context -> answer"
        )

        # Synthesize final answer
        self.synthesize = dspy.ChainOfThought(
            "question, subanswers -> final_answer, reasoning"
        )

    def forward(self, question):
        # Decompose into subquestions
        decomp = self.decompose(question=question)
        subquestions = [
            sq.strip()
            for sq in decomp.subquestions.split('\n')
            if sq.strip()
        ]

        # Answer each subquestion
        subanswers = []
        for subq in subquestions:
            context = self.retrieve(subq).passages
            answer = self.answer_subq(
                subquestion=subq,
                context="\n".join(context)
            )
            subanswers.append(answer.answer)

        # Synthesize final answer
        combined = "\n".join([
            f"Q: {sq}\nA: {sa}"
            for sq, sa in zip(subquestions, subanswers)
        ])

        final = self.synthesize(
            question=question,
            subanswers=combined
        )

        return dspy.Prediction(
            answer=final.final_answer,
            reasoning=final.reasoning,
            subquestions=subquestions,
            subanswers=subanswers
        )
python
class MultiHopQA(dspy.Module):
    """多跳问答系统。"""

    def __init__(self):
        super().__init__()

        # 拆解复杂问题
        self.decompose = dspy.ChainOfThought(
            "question -> subquestions"
        )

        # 结合检索回答子问题
        self.retrieve = dspy.Retrieve(k=3)
        self.answer_subq = dspy.ChainOfThought(
            "subquestion, context -> answer"
        )

        # 合成最终答案
        self.synthesize = dspy.ChainOfThought(
            "question, subanswers -> final_answer, reasoning"
        )

    def forward(self, question):
        # 拆解为子问题
        decomp = self.decompose(question=question)
        subquestions = [
            sq.strip()
            for sq in decomp.subquestions.split('\n')
            if sq.strip()
        ]

        # 回答每个子问题
        subanswers = []
        for subq in subquestions:
            context = self.retrieve(subq).passages
            answer = self.answer_subq(
                subquestion=subq,
                context="\n".join(context)
            )
            subanswers.append(answer.answer)

        # 合成最终答案
        combined = "\n".join([
            f"Q: {sq}\nA: {sa}"
            for sq, sa in zip(subquestions, subanswers)
        ])

        final = self.synthesize(
            question=question,
            subanswers=combined
        )

        return dspy.Prediction(
            answer=final.final_answer,
            reasoning=final.reasoning,
            subquestions=subquestions,
            subanswers=subanswers
        )

Optimize multi-hop QA

优化多跳QA系统

multihop_qa = MultiHopQA()
def multihop_metric(example, prediction, trace=None): # Check answer correctness correct = example.answer.lower() in prediction.answer.lower()
# Check reasoning quality
has_reasoning = len(prediction.reasoning) > 50

# Check subquestion coverage
has_subquestions = len(prediction.subquestions) >= 2

return correct and has_reasoning and has_subquestions
optimizer = MIPROv2(metric=multihop_metric, auto="heavy") optimized_multihop = optimizer.compile(multihop_qa, trainset=multihop_examples)
undefined
multihop_qa = MultiHopQA()
def multihop_metric(example, prediction, trace=None): # 检查答案正确性 correct = example.answer.lower() in prediction.answer.lower()
# 检查推理质量
has_reasoning = len(prediction.reasoning) > 50

# 检查子问题覆盖率
has_subquestions = len(prediction.subquestions) >= 2

return correct and has_reasoning and has_subquestions
optimizer = MIPROv2(metric=multihop_metric, auto="heavy") optimized_multihop = optimizer.compile(multihop_qa, trainset=multihop_examples)
undefined

Migration from Manual Prompting

从手动提示词工程迁移

Before: Manual Prompting

之前:手动提示词工程

python
undefined
python
undefined

Manual prompt engineering

手动提示词工程

PROMPT = """ You are a helpful assistant. Answer questions accurately and concisely.
Examples: Q: What is 2+2? A: 4
Q: Capital of France? A: Paris
Q: {question} A: """
def manual_qa(question): response = llm.invoke(PROMPT.format(question=question)) return response
undefined
PROMPT = """ 你是一个乐于助人的助手。请准确、简洁地回答问题。
示例: Q: 2+2等于多少? A: 4
Q: 法国的首都是? A: Paris
Q: {question} A: """
def manual_qa(question): response = llm.invoke(PROMPT.format(question=question)) return response
undefined

After: DSPy

之后:使用DSPy

python
undefined
python
undefined

DSPy declarative approach

DSPy声明式方法

class QA(dspy.Signature): """Answer questions accurately and concisely.""" question = dspy.InputField() answer = dspy.OutputField(desc="short factual answer")
qa = dspy.ChainOfThought(QA)
class QA(dspy.Signature): """准确、简洁地回答问题。""" question = dspy.InputField() answer = dspy.OutputField(desc="简短事实性答案")
qa = dspy.ChainOfThought(QA)

Optimize automatically

自动优化

optimizer = MIPROv2(metric=accuracy_metric, auto="medium") optimized_qa = optimizer.compile(qa, trainset=examples)
def dspy_qa(question): result = optimized_qa(question=question) return result.answer

**Benefits**:
- Systematic optimization vs. manual trial-and-error
- Version control and reproducibility
- Automatic adaptation to new models
- Performance gains: +18-38% accuracy
optimizer = MIPROv2(metric=accuracy_metric, auto="medium") optimized_qa = optimizer.compile(qa, trainset=examples)
def dspy_qa(question): result = optimized_qa(question=question) return result.answer

**优势**:
- 系统化优化 vs 手动试错
- 版本控制与可复现性
- 自动适配新模型
- 性能提升:准确率提升18-38%

Best Practices

最佳实践

Data Preparation

数据准备

python
undefined
python
undefined

Create high-quality training examples

创建高质量训练示例

def prepare_training_data(raw_data): """Convert raw data to DSPy examples.""" examples = []
for item in raw_data:
    example = dspy.Example(
        question=item["question"],
        answer=item["answer"],
        context=item.get("context", "")  # Optional fields
    ).with_inputs("question", "context")  # Mark input fields

    examples.append(example)

return examples
def prepare_training_data(raw_data): """将原始数据转换为DSPy示例。""" examples = []
for item in raw_data:
    example = dspy.Example(
        question=item["question"],
        answer=item["answer"],
        context=item.get("context", "")  # 可选字段
    ).with_inputs("question", "context")  # 标记输入字段

    examples.append(example)

return examples

Split data properly

合理拆分数据

def train_val_test_split(examples, train=0.7, val=0.15, test=0.15): """Split data for optimization and evaluation.""" import random random.shuffle(examples)
n = len(examples)
train_end = int(n * train)
val_end = int(n * (train + val))

return {
    "train": examples[:train_end],
    "val": examples[train_end:val_end],
    "test": examples[val_end:]
}
def train_val_test_split(examples, train=0.7, val=0.15, test=0.15): """拆分数据用于优化与评估。""" import random random.shuffle(examples)
n = len(examples)
train_end = int(n * train)
val_end = int(n * (train + val))

return {
    "train": examples[:train_end],
    "val": examples[train_end:val_end],
    "test": examples[val_end:]
}

Use split data

使用拆分后的数据

data = train_val_test_split(all_examples) optimized = optimizer.compile( program, trainset=data["train"], valset=data["val"] # For hyperparameter tuning )
data = train_val_test_split(all_examples) optimized = optimizer.compile( program, trainset=data["train"], valset=data["val"] # 用于超参数调优 )

Final evaluation on held-out test set

在预留测试集上进行最终评估

evaluator = Evaluator(optimized, metrics={"accuracy": accuracy_metric}) test_results = evaluator.evaluate(data["test"])
undefined
evaluator = Evaluator(optimized, metrics={"accuracy": accuracy_metric}) test_results = evaluator.evaluate(data["test"])
undefined

Metric Design

指标设计

python
undefined
python
undefined

Design metrics aligned with business goals

设计与业务目标对齐的指标

def business_aligned_metric(example, prediction, trace=None): """Metric aligned with business KPIs."""
# Core correctness (must have)
correct = example.answer.lower() in prediction.answer.lower()
if not correct:
    return 0.0

# Business-specific criteria
is_concise = len(prediction.answer) < 100  # User preference
is_professional = not any(
    word in prediction.answer.lower()
    for word in ["um", "like", "maybe", "dunno"]
)
has_confidence = (
    hasattr(prediction, 'confidence') and
    float(prediction.confidence) > 0.7
)

# Weighted score
score = (
    correct * 1.0 +
    is_concise * 0.2 +
    is_professional * 0.3 +
    has_confidence * 0.2
) / 1.7

return score
undefined
def business_aligned_metric(example, prediction, trace=None): """与业务KPI对齐的指标。"""
# 核心正确性(必须满足)
correct = example.answer.lower() in prediction.answer.lower()
if not correct:
    return 0.0

# 业务特定标准
is_concise = len(prediction.answer) < 100  # 用户偏好
is_professional = not any(
    word in prediction.answer.lower()
    for word in ["um", "like", "maybe", "dunno"]
)
has_confidence = (
    hasattr(prediction, 'confidence') and
    float(prediction.confidence) > 0.7
)

# 加权得分
score = (
    correct * 1.0 +
    is_concise * 0.2 +
    is_professional * 0.3 +
    has_confidence * 0.2
) / 1.7

return score
undefined

Error Handling

错误处理

python
class RobustModule(dspy.Module):
    """Module with error handling."""

    def __init__(self):
        super().__init__()
        self.qa = dspy.ChainOfThought("question -> answer")

    def forward(self, question, max_retries=3):
        """Forward with retry logic."""
        for attempt in range(max_retries):
            try:
                result = self.qa(question=question)

                # Validate output
                if self._validate_output(result):
                    return result
                else:
                    logging.warning(f"Invalid output on attempt {attempt + 1}")

            except Exception as e:
                logging.error(f"Error on attempt {attempt + 1}: {e}")

                if attempt == max_retries - 1:
                    raise

        # Fallback
        return dspy.Prediction(
            answer="I'm unable to answer that question.",
            confidence=0.0
        )

    def _validate_output(self, result):
        """Validate output quality."""
        return (
            hasattr(result, 'answer') and
            len(result.answer) > 0 and
            len(result.answer) < 1000
        )
python
class RobustModule(dspy.Module):
    """带错误处理的模块。"""

    def __init__(self):
        super().__init__()
        self.qa = dspy.ChainOfThought("question -> answer")

    def forward(self, question, max_retries=3):
        """带重试逻辑的前向传播。"""
        for attempt in range(max_retries):
            try:
                result = self.qa(question=question)

                # 验证输出
                if self._validate_output(result):
                    return result
                else:
                    logging.warning(f"第{attempt + 1}次尝试输出无效")

            except Exception as e:
                logging.error(f"第{attempt + 1}次尝试出错: {e}")

                if attempt == max_retries - 1:
                    raise

        #  fallback
        return dspy.Prediction(
            answer="我无法回答这个问题。",
            confidence=0.0
        )

    def _validate_output(self, result):
        """验证输出质量。"""
        return (
            hasattr(result, 'answer') and
            len(result.answer) > 0 and
            len(result.answer) < 1000
        )

Caching for Efficiency

缓存以提升效率

python
from functools import lru_cache
import hashlib

class CachedModule(dspy.Module):
    """Module with semantic caching."""

    def __init__(self, base_module):
        super().__init__()
        self.base_module = base_module
        self.cache = {}

    def forward(self, question):
        # Check cache
        cache_key = self._get_cache_key(question)

        if cache_key in self.cache:
            logging.info("Cache hit")
            return self.cache[cache_key]

        # Cache miss: execute module
        result = self.base_module(question=question)
        self.cache[cache_key] = result

        return result

    def _get_cache_key(self, question):
        """Generate cache key."""
        return hashlib.md5(question.lower().encode()).hexdigest()
python
from functools import lru_cache
import hashlib

class CachedModule(dspy.Module):
    """带语义缓存的模块。"""

    def __init__(self, base_module):
        super().__init__()
        self.base_module = base_module
        self.cache = {}

    def forward(self, question):
        # 检查缓存
        cache_key = self._get_cache_key(question)

        if cache_key in self.cache:
            logging.info("缓存命中")
            return self.cache[cache_key]

        # 缓存未命中:执行模块
        result = self.base_module(question=question)
        self.cache[cache_key] = result

        return result

    def _get_cache_key(self, question):
        """生成缓存键。"""
        return hashlib.md5(question.lower().encode()).hexdigest()

Use cached module

使用带缓存的模块

base_qa = dspy.ChainOfThought("question -> answer") cached_qa = CachedModule(base_qa)
undefined
base_qa = dspy.ChainOfThought("question -> answer") cached_qa = CachedModule(base_qa)
undefined

Troubleshooting

故障排除

Common Issues

常见问题

Low Optimization Performance:
  • Increase training data size (aim for 100+ examples)
  • Use better quality metric (more specific)
  • Try different optimizer (
    auto="heavy"
    for MIPROv2)
  • Check for data leakage in metric
Optimization Takes Too Long:
  • Use
    auto="light"
    instead of
    "heavy"
  • Reduce
    num_trials
    for MIPROv2
  • Use BootstrapFewShot instead of MIPROv2 for quick iteration
  • Parallelize with
    num_threads
    parameter
Inconsistent Results:
  • Set random seed:
    dspy.configure(random_seed=42)
  • Increase temperature for diversity or decrease for consistency
  • Use ensemble of multiple optimized programs
  • Validate on larger test set
Out of Memory:
  • Reduce batch size in optimization
  • Use streaming for large datasets
  • Clear cache periodically
  • Use smaller model for bootstrapping
优化性能低下:
  • 增加训练数据量(目标100+示例)
  • 使用更精准的质量指标
  • 尝试不同的优化器(MIPROv2使用
    auto="heavy"
  • 检查指标中是否存在数据泄露
优化耗时过长:
  • 使用
    auto="light"
    替代
    "heavy"
  • 减少MIPROv2的
    num_trials
  • 快速迭代时使用BootstrapFewShot替代MIPROv2
  • 使用
    num_threads
    参数并行化
结果不一致:
  • 设置随机种子:
    dspy.configure(random_seed=42)
  • 提高温度参数增加多样性,或降低参数提升一致性
  • 使用多个优化程序的集成
  • 在更大的测试集上验证
内存不足:
  • 减少优化中的批处理大小
  • 对大数据集使用流式处理
  • 定期清理缓存
  • 引导阶段使用更小的模型

Debugging Optimization

优化调试

python
undefined
python
undefined

Enable verbose logging

启用详细日志

import logging logging.basicConfig(level=logging.INFO)
import logging logging.basicConfig(level=logging.INFO)

Custom teleprompter with debugging

带调试功能的自定义提示词生成器

class DebugTeleprompter: def init(self, metric): self.metric = metric self.history = []
def compile(self, student, trainset):
    print(f"\nStarting optimization with {len(trainset)} examples")

    # Bootstrap with debugging
    bootstrap = BootstrapFewShot(metric=self.metric)

    for i, example in enumerate(trainset):
        prediction = student(**example.inputs())
        score = self.metric(example, prediction)

        self.history.append({
            "example_idx": i,
            "score": score,
            "prediction": str(prediction)
        })

        print(f"Example {i}: score={score}")

    # Continue with optimization
    optimized = bootstrap.compile(student, trainset=trainset)

    print(f"\nOptimization complete")
    print(f"Average score: {sum(h['score'] for h in self.history) / len(self.history):.2f}")

    return optimized
class DebugTeleprompter: def init(self, metric): self.metric = metric self.history = []
def compile(self, student, trainset):
    print(f"\n开始优化,使用{len(trainset)}个示例")

    # 带调试的引导
    bootstrap = BootstrapFewShot(metric=self.metric)

    for i, example in enumerate(trainset):
        prediction = student(**example.inputs())
        score = self.metric(example, prediction)

        self.history.append({
            "example_idx": i,
            "score": score,
            "prediction": str(prediction)
        })

        print(f"示例{i}: 得分={score}")

    # 继续优化
    optimized = bootstrap.compile(student, trainset=trainset)

    print(f"\n优化完成")
    print(f"平均得分: {sum(h['score'] for h in self.history) / len(self.history):.2f}")

    return optimized

Use debug teleprompter

使用调试提示词生成器

debug_optimizer = DebugTeleprompter(metric=accuracy_metric) optimized = debug_optimizer.compile(qa_module, trainset=examples)
undefined
debug_optimizer = DebugTeleprompter(metric=accuracy_metric) optimized = debug_optimizer.compile(qa_module, trainset=examples)
undefined

Performance Benchmarks

性能基准

Based on 2025 production studies:
Use CaseBaselineDSPy OptimizedImprovementOptimizer Used
Prompt Evaluation46.2%64.0%+38.5%MIPROv2
Guardrail Enforcement72.1%84.3%+16.9%MIPROv2
Code Generation58.4%71.2%+21.9%MIPROv2
Hallucination Detection65.8%79.5%+20.8%BootstrapFewShot
Agent Routing69.3%82.1%+18.5%MIPROv2
RAG Accuracy54.0%68.5%+26.9%BootstrapFewShot + MIPRO
Production Adopters: JetBlue, Databricks, Walmart, VMware, Replit, Sephora, Moody's
基于2025年生产环境研究:
用场景基线DSPy优化后提升幅度使用的优化器
提示词评估46.2%64.0%+38.5%MIPROv2
护栏执行72.1%84.3%+16.9%MIPROv2
代码生成58.4%71.2%+21.9%MIPROv2
幻觉检测65.8%79.5%+20.8%BootstrapFewShot
Agent路由69.3%82.1%+18.5%MIPROv2
RAG准确率54.0%68.5%+26.9%BootstrapFewShot + MIPRO
生产环境采用者:JetBlue、Databricks、Walmart、VMware、Replit、Sephora、Moody's

Resources

资源

  • Documentation: https://dspy.ai/
  • GitHub: https://github.com/stanfordnlp/dspy
  • Paper: "DSPy: Compiling Declarative Language Model Calls into Self-Improving Pipelines"
  • 2025 Study: "Is It Time To Treat Prompts As Code?" (arXiv:2507.03620)
  • Community: Discord, GitHub Discussions
  • 文档https://dspy.ai/
  • GitHubhttps://github.com/stanfordnlp/dspy
  • 论文:"DSPy: Compiling Declarative Language Model Calls into Self-Improving Pipelines"
  • 2025年研究:"Is It Time To Treat Prompts As Code?" (arXiv:2507.03620)
  • 社区:Discord、GitHub Discussions

Related Skills

相关技能

When using Dspy, these skills enhance your workflow:
  • langgraph: LangGraph for multi-agent orchestration (use with DSPy-optimized prompts)
  • test-driven-development: Testing DSPy modules and prompt optimizations
  • systematic-debugging: Debugging DSPy compilation and optimization failures
[Full documentation available in these skills if deployed in your bundle]
使用Dspy时,以下技能可提升你的工作流:
  • langgraph:用于多Agent编排的LangGraph(与DSPy优化后的提示词配合使用)
  • test-driven-development:测试DSPy模块与提示词优化
  • systematic-debugging:调试DSPy编译与优化失败问题
[完整文档可在你的技能包中查看(若已部署)]