dspy-framework
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDSPy Framework
DSPy 框架
progressive_disclosure: entry_point: summary: "Declarative framework for automatic prompt optimization treating prompts as code" when_to_use: - "When optimizing prompts systematically with evaluation data" - "When building production LLM systems requiring accuracy improvements" - "When implementing RAG, classification, or structured extraction tasks" - "When version-controlled, reproducible prompts are needed" quick_start: - "pip install dspy-ai" - "Define signature: class QA(dspy.Signature): question = dspy.InputField(); answer = dspy.OutputField()" - "Create module: qa = dspy.ChainOfThought(QA)" - "Optimize: optimizer.compile(qa, trainset=examples)" token_estimate: entry: 75 full: 5500
progressive_disclosure: entry_point: summary: "将提示词视为代码的声明式自动提示词优化框架" when_to_use: - "需要使用评估数据系统化优化提示词时" - "构建需要提升准确率的生产级LLM系统时" - "实现RAG、分类或结构化提取任务时" - "需要版本可控、可复现的提示词时" quick_start: - "pip install dspy-ai" - "定义签名:class QA(dspy.Signature): question = dspy.InputField(); answer = dspy.OutputField()" - "创建模块:qa = dspy.ChainOfThought(QA)" - "优化:optimizer.compile(qa, trainset=examples)" token_estimate: entry: 75 full: 5500
Core Philosophy
核心理念
DSPy (Declarative Self-improving Python) shifts focus from manual prompt engineering to programming language models. Treat prompts as code with:
- Declarative signatures defining inputs/outputs
- Automatic optimization via compilers
- Version control and systematic testing
- Reproducible results across model changes
Key Principle: Don't write prompts manually—define task specifications and let DSPy optimize them.
DSPy(Declarative Self-improving Python,声明式自改进Python)将关注点从手动提示词工程转向编程语言模型。它将提示词视为代码,具备以下特性:
- 声明式签名:定义输入/输出
- 自动优化:通过编译器实现
- 版本控制与系统化测试
- 跨模型可复现的结果
核心原则:无需手动编写提示词——只需定义任务规范,让DSPy自动优化。
Core Concepts
核心概念
Signatures: Defining Task Interfaces
签名:定义任务接口
Signatures specify what your LM module should do (inputs → outputs) without saying how.
Basic Signature:
python
import dspy签名指定LM模块应完成的任务(输入→输出),无需说明实现方式。
基础签名:
python
import dspyInline signature (quick)
内联签名(快速方式)
qa_module = dspy.ChainOfThought("question -> answer")
qa_module = dspy.ChainOfThought("question -> answer")
Class-based signature (recommended for production)
类基签名(生产环境推荐)
class QuestionAnswer(dspy.Signature):
"""Answer questions with short factual answers."""
question = dspy.InputField()
answer = dspy.OutputField(desc="often between 1 and 5 words")class QuestionAnswer(dspy.Signature):
"""用简短事实性答案回答问题。"""
question = dspy.InputField()
answer = dspy.OutputField(desc="通常为1-5个单词")Use signature
使用签名
qa = dspy.ChainOfThought(QuestionAnswer)
response = qa(question="What is the capital of France?")
print(response.answer) # "Paris"
**Advanced Signatures with Type Hints**:
```python
from typing import List
class DocumentSummary(dspy.Signature):
"""Generate concise document summaries."""
document: str = dspy.InputField(desc="Full text to summarize")
key_points: List[str] = dspy.OutputField(desc="3-5 bullet points")
summary: str = dspy.OutputField(desc="2-3 sentence summary")
sentiment: str = dspy.OutputField(desc="positive, negative, or neutral")qa = dspy.ChainOfThought(QuestionAnswer)
response = qa(question="法国的首都是什么?")
print(response.answer) # "Paris"
**带类型提示的高级签名**:
```python
from typing import List
class DocumentSummary(dspy.Signature):
"""生成简洁的文档摘要。"""
document: str = dspy.InputField(desc="待总结的完整文本")
key_points: List[str] = dspy.OutputField(desc="3-5个要点")
summary: str = dspy.OutputField(desc="2-3句话的摘要")
sentiment: str = dspy.OutputField(desc="positive, negative, or neutral")Type hints provide strong typing and validation
类型提示提供强类型与验证
summarizer = dspy.ChainOfThought(DocumentSummary)
result = summarizer(document="Long document text...")
**Field Descriptions**:
- Short, descriptive phrases (not full sentences)
- Examples: `desc="often between 1 and 5 words"`, `desc="JSON format"`
- Used by optimizers to improve prompt qualitysummarizer = dspy.ChainOfThought(DocumentSummary)
result = summarizer(document="长文档文本...")
**字段描述**:
- 简短的描述性短语(非完整句子)
- 示例:`desc="通常为1-5个单词"`, `desc="JSON格式"`
- 供优化器用于提升提示词质量Modules: Building Blocks
模块:构建块
Modules are DSPy's reasoning patterns—replacements for manual prompt engineering.
ChainOfThought (CoT):
python
undefined模块是DSPy的推理模式,可替代手动提示词工程。
ChainOfThought(CoT,思维链):
python
undefinedZero-shot reasoning
零样本推理
class Reasoning(dspy.Signature):
"""Solve complex problems step by step."""
problem = dspy.InputField()
solution = dspy.OutputField()
cot = dspy.ChainOfThought(Reasoning)
result = cot(problem="Roger has 5 tennis balls. He buys 2 cans of 3 balls each. How many total?")
print(result.solution) # Includes reasoning steps automatically
print(result.rationale) # Access the chain-of-thought reasoning
**Retrieve Module (RAG)**:
```python
class RAGSignature(dspy.Signature):
"""Answer questions using retrieved context."""
question = dspy.InputField()
context = dspy.InputField(desc="relevant passages")
answer = dspy.OutputField(desc="answer based on context")class Reasoning(dspy.Signature):
"""逐步解决复杂问题。"""
problem = dspy.InputField()
solution = dspy.OutputField()
cot = dspy.ChainOfThought(Reasoning)
result = cot(problem="Roger有5个网球。他买了2罐,每罐3个球。总共有多少个?")
print(result.solution) # 自动包含推理步骤
print(result.rationale) # 访问思维链推理过程
**检索模块(RAG)**:
```python
class RAGSignature(dspy.Signature):
"""使用检索到的上下文回答问题。"""
question = dspy.InputField()
context = dspy.InputField(desc="相关段落")
answer = dspy.OutputField(desc="基于上下文的答案")Combine retrieval + reasoning
结合检索+推理
retriever = dspy.Retrieve(k=3) # Retrieve top 3 passages
rag = dspy.ChainOfThought(RAGSignature)
retriever = dspy.Retrieve(k=3) # 检索前3个相关段落
rag = dspy.ChainOfThought(RAGSignature)
Use in pipeline
在流水线中使用
question = "What is quantum entanglement?"
context = retriever(question).passages
answer = rag(question=question, context=context)
**ReAct (Reasoning + Acting)**:
```python
class ResearchTask(dspy.Signature):
"""Research a topic using tools."""
topic = dspy.InputField()
findings = dspy.OutputField()question = "什么是量子纠缠?"
context = retriever(question).passages
answer = rag(question=question, context=context)
**ReAct(推理+行动)**:
```python
class ResearchTask(dspy.Signature):
"""使用工具研究主题。"""
topic = dspy.InputField()
findings = dspy.OutputField()ReAct interleaves reasoning with tool calls
ReAct交替进行推理与工具调用
react = dspy.ReAct(ResearchTask, tools=[web_search, calculator])
result = react(topic="Apple stock price change last month")
react = dspy.ReAct(ResearchTask, tools=[web_search, calculator])
result = react(topic="苹果公司上月股价变动")
Automatically uses tools when needed
必要时自动使用工具
**ProgramOfThought**:
```python
**ProgramOfThought(思维程序)**:
```pythonGenerate and execute Python code
生成并执行Python代码
class MathProblem(dspy.Signature):
"""Solve math problems by writing Python code."""
problem = dspy.InputField()
code = dspy.OutputField(desc="Python code to solve problem")
result = dspy.OutputField(desc="final numerical answer")
pot = dspy.ProgramOfThought(MathProblem)
answer = pot(problem="Calculate compound interest on $1000 at 5% for 10 years")
**Custom Modules**:
```python
class MultiStepRAG(dspy.Module):
"""Custom module combining retrieval and reasoning."""
def __init__(self, num_passages=3):
super().__init__()
self.retrieve = dspy.Retrieve(k=num_passages)
self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
# Retrieve relevant passages
context = self.retrieve(question).passages
# Generate answer with context
prediction = self.generate(context=context, question=question)
# Return with metadata
return dspy.Prediction(
answer=prediction.answer,
context=context,
rationale=prediction.rationale
)class MathProblem(dspy.Signature):
"""通过编写Python代码解决数学问题。"""
problem = dspy.InputField()
code = dspy.OutputField(desc="用于解决问题的Python代码")
result = dspy.OutputField(desc="最终数值答案")
pot = dspy.ProgramOfThought(MathProblem)
answer = pot(problem="计算1000美元在5%利率下10年的复利")
**自定义模块**:
```python
class MultiStepRAG(dspy.Module):
"""结合检索与推理的自定义模块。"""
def __init__(self, num_passages=3):
super().__init__()
self.retrieve = dspy.Retrieve(k=num_passages)
self.generate = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
# 检索相关段落
context = self.retrieve(question).passages
# 结合上下文生成答案
prediction = self.generate(context=context, question=question)
# 返回带元数据的结果
return dspy.Prediction(
answer=prediction.answer,
context=context,
rationale=prediction.rationale
)Use custom module
使用自定义模块
rag = MultiStepRAG(num_passages=5)
optimized_rag = optimizer.compile(rag, trainset=examples)
undefinedrag = MultiStepRAG(num_passages=5)
optimized_rag = optimizer.compile(rag, trainset=examples)
undefinedOptimizers: Automatic Prompt Improvement
优化器:自动提示词改进
Optimizers compile your high-level program into optimized prompts or fine-tuned weights.
优化器将你的高层程序编译为优化后的提示词或微调权重。
BootstrapFewShot
BootstrapFewShot
Best For: Small datasets (10-50 examples), quick optimization
Optimizes: Few-shot examples only
python
from dspy.teleprompt import BootstrapFewShot最佳适用场景:小数据集(10-50个示例)、快速优化
优化对象:仅少样本示例
python
from dspy.teleprompt import BootstrapFewShotDefine metric function
定义指标函数
def accuracy_metric(example, prediction, trace=None):
"""Evaluate prediction correctness."""
return example.answer.lower() == prediction.answer.lower()
def accuracy_metric(example, prediction, trace=None):
"""评估预测的正确性。"""
return example.answer.lower() == prediction.answer.lower()
Configure optimizer
配置优化器
optimizer = BootstrapFewShot(
metric=accuracy_metric,
max_bootstrapped_demos=4, # Max examples to bootstrap
max_labeled_demos=16, # Max labeled examples to consider
max_rounds=1, # Bootstrapping rounds
max_errors=10 # Stop after N errors
)
optimizer = BootstrapFewShot(
metric=accuracy_metric,
max_bootstrapped_demos=4, # 最大引导示例数
max_labeled_demos=16, # 考虑的最大标注示例数
max_rounds=1, # 引导轮次
max_errors=10 # 出现N次错误后停止
)
Training examples
训练示例
trainset = [
dspy.Example(question="What is 2+2?", answer="4").with_inputs("question"),
dspy.Example(question="Capital of France?", answer="Paris").with_inputs("question"),
# ... more examples
]
trainset = [
dspy.Example(question="2+2等于多少?", answer="4").with_inputs("question"),
dspy.Example(question="法国的首都是?", answer="Paris").with_inputs("question"),
# ... 更多示例
]
Compile program
编译程序
qa_module = dspy.ChainOfThought("question -> answer")
optimized_qa = optimizer.compile(
student=qa_module,
trainset=trainset
)
qa_module = dspy.ChainOfThought("question -> answer")
optimized_qa = optimizer.compile(
student=qa_module,
trainset=trainset
)
Save optimized program
保存优化后的程序
optimized_qa.save("qa_optimized.json")
**How It Works**:
1. Uses your program to generate outputs on training data
2. Filters successful traces using your metric
3. Selects representative examples as demonstrations
4. Returns optimized program with best few-shot examplesoptimized_qa.save("qa_optimized.json")
**工作原理**:
1. 使用你的程序在训练数据上生成输出
2. 利用你的指标过滤成功的轨迹
3. 选择代表性示例作为演示
4. 返回包含最佳少样本示例的优化程序BootstrapFewShotWithRandomSearch
BootstrapFewShotWithRandomSearch
Best For: Medium datasets (50-300 examples), better exploration
Optimizes: Few-shot examples with candidate exploration
python
from dspy.teleprompt import BootstrapFewShotWithRandomSearch
config = dict(
max_bootstrapped_demos=4,
max_labeled_demos=4,
num_candidate_programs=10, # Explore 10 candidate programs
num_threads=4 # Parallel optimization
)
optimizer = BootstrapFewShotWithRandomSearch(
metric=accuracy_metric,
**config
)
optimized_program = optimizer.compile(
qa_module,
trainset=training_examples,
valset=validation_examples # Optional validation set
)最佳适用场景:中等数据集(50-300个示例)、更好的探索性
优化对象:带候选探索的少样本示例
python
from dspy.teleprompt import BootstrapFewShotWithRandomSearch
config = dict(
max_bootstrapped_demos=4,
max_labeled_demos=4,
num_candidate_programs=10, # 探索10个候选程序
num_threads=4 # 并行优化
)
optimizer = BootstrapFewShotWithRandomSearch(
metric=accuracy_metric,
**config
)
optimized_program = optimizer.compile(
qa_module,
trainset=training_examples,
valset=validation_examples # 可选验证集
)Compare candidates
比较候选程序
print(f"Best program score: {optimizer.best_score}")
**Advantage**: Explores multiple candidate programs in parallel, selecting best performer via random search.print(f"最佳程序得分: {optimizer.best_score}")
**优势**:并行探索多个候选程序,通过随机搜索选择最佳表现者。MIPROv2 (State-of-the-Art 2025)
MIPROv2(2025年最先进方案)
Best For: Large datasets (300+ examples), production systems
Optimizes: Instructions AND few-shot examples jointly via Bayesian optimization
python
import dspy
from dspy.teleprompt import MIPROv2最佳适用场景:大数据集(300+示例)、生产级系统
优化对象:通过贝叶斯优化联合优化指令与少样本示例
python
import dspy
from dspy.teleprompt import MIPROv2Initialize language model
初始化语言模型
lm = dspy.LM('openai/gpt-4o-mini', api_key='YOUR_API_KEY')
dspy.configure(lm=lm)
lm = dspy.LM('openai/gpt-4o-mini', api_key='YOUR_API_KEY')
dspy.configure(lm=lm)
Define comprehensive metric
定义综合指标
def quality_metric(example, prediction, trace=None):
"""Multi-dimensional quality scoring."""
correct = example.answer.lower() in prediction.answer.lower()
reasonable_length = 10 < len(prediction.answer) < 200
has_reasoning = hasattr(prediction, 'rationale') and len(prediction.rationale) > 20
# Weighted composite score
score = (
correct * 1.0 +
reasonable_length * 0.2 +
has_reasoning * 0.3
)
return score / 1.5 # Normalize to [0, 1]def quality_metric(example, prediction, trace=None):
"""多维度质量评分。"""
correct = example.answer.lower() in prediction.answer.lower()
reasonable_length = 10 < len(prediction.answer) < 200
has_reasoning = hasattr(prediction, 'rationale') and len(prediction.rationale) > 20
# 加权综合得分
score = (
correct * 1.0 +
reasonable_length * 0.2 +
has_reasoning * 0.3
)
return score / 1.5 # 归一化到[0, 1]Initialize MIPROv2 with auto-configuration
使用自动配置初始化MIPROv2
teleprompter = MIPROv2(
metric=quality_metric,
auto="medium", # Options: "light", "medium", "heavy"
num_candidates=10, # Number of instruction candidates to explore
init_temperature=1.0 # Temperature for instruction generation
)
teleprompter = MIPROv2(
metric=quality_metric,
auto="medium", # 选项: "light", "medium", "heavy"
num_candidates=10, # 要探索的指令候选数
init_temperature=1.0 # 指令生成的温度参数
)
Optimize program
优化程序
optimized_program = teleprompter.compile(
dspy.ChainOfThought("question -> answer"),
trainset=training_examples,
num_trials=100, # Bayesian optimization trials
max_bootstrapped_demos=4,
max_labeled_demos=8
)
optimized_program = teleprompter.compile(
dspy.ChainOfThought("question -> answer"),
trainset=training_examples,
num_trials=100, # 贝叶斯优化试验次数
max_bootstrapped_demos=4,
max_labeled_demos=8
)
Save for production
保存用于生产环境
optimized_program.save("production_qa_model.json")
**MIPROv2 Auto-Configuration Modes**:
- **`light`**: Fast optimization, ~20 trials, best for iteration (15-30 min)
- **`medium`**: Balanced optimization, ~50 trials, recommended default (30-60 min)
- **`heavy`**: Exhaustive search, ~100+ trials, highest quality (1-3 hours)
**How MIPROv2 Works**:
1. **Bootstrap Candidates**: Generates few-shot example candidates from training data
2. **Propose Instructions**: Creates instruction variations grounded in task dynamics
3. **Bayesian Optimization**: Uses surrogate model to find optimal instruction + example combinations
4. **Joint Optimization**: Optimizes both components together (not separately) for synergy
**Performance Gains** (2025 Study):
- Prompt Evaluation: +38.5% accuracy (46.2% → 64.0%)
- Guardrail Enforcement: +16.9% accuracy (72.1% → 84.3%)
- Code Generation: +21.9% accuracy (58.4% → 71.2%)
- Hallucination Detection: +20.8% accuracy (65.8% → 79.5%)
- Agent Routing: +18.5% accuracy (69.3% → 82.1%)optimized_program.save("production_qa_model.json")
**MIPROv2自动配置模式**:
- **`light`**: 快速优化,约20次试验,适合迭代(15-30分钟)
- **`medium`**: 平衡优化,约50次试验,推荐默认配置(30-60分钟)
- **`heavy`**: exhaustive搜索,约100+次试验,最高质量(1-3小时)
**MIPROv2工作原理**:
1. **引导候选示例**:从训练数据生成少样本示例候选
2. **生成指令候选**:基于任务动态创建指令变体
3. **贝叶斯优化**:使用代理模型找到最优指令+示例组合
4. **联合优化**:协同优化两个组件(而非单独优化)以实现增效
**性能提升**(2025年研究):
- 提示词评估:准确率提升38.5%(46.2% → 64.0%)
- 护栏执行:准确率提升16.9%(72.1% → 84.3%)
- 代码生成:准确率提升21.9%(58.4% → 71.2%)
- 幻觉检测:准确率提升20.8%(65.8% → 79.5%)
- Agent路由:准确率提升18.5%(69.3% → 82.1%)KNN Few-Shot Selector
KNN少样本选择器
Best For: Dynamic example selection based on query similarity
python
from dspy.teleprompt import KNNFewShot最佳适用场景:基于查询相似度的动态示例选择
python
from dspy.teleprompt import KNNFewShotRequires embeddings for examples
需要为示例生成嵌入
knn_optimizer = KNNFewShot(
k=3, # Select 3 most similar examples
trainset=training_examples
)
optimized_program = knn_optimizer.compile(qa_module)
knn_optimizer = KNNFewShot(
k=3, # 选择3个最相似的示例
trainset=training_examples
)
optimized_program = knn_optimizer.compile(qa_module)
Automatically selects relevant examples at inference time
推理时自动选择相关示例
Math query → retrieves math examples
数学查询 → 检索数学示例
Geography query → retrieves geography examples
地理查询 → 检索地理示例
undefinedundefinedSignatureOptimizer
签名优化器
Best For: Optimizing signature descriptions and field specifications
python
from dspy.teleprompt import SignatureOptimizer
sig_optimizer = SignatureOptimizer(
metric=accuracy_metric,
breadth=10, # Number of variations to generate
depth=3 # Optimization iterations
)
optimized_signature = sig_optimizer.compile(
initial_signature=QuestionAnswer,
trainset=trainset
)最佳适用场景:优化签名描述与字段规范
python
from dspy.teleprompt import SignatureOptimizer
sig_optimizer = SignatureOptimizer(
metric=accuracy_metric,
breadth=10, # 要生成的变体数量
depth=3 # 优化迭代次数
)
optimized_signature = sig_optimizer.compile(
initial_signature=QuestionAnswer,
trainset=trainset
)Use optimized signature
使用优化后的签名
qa = dspy.ChainOfThought(optimized_signature)
undefinedqa = dspy.ChainOfThought(optimized_signature)
undefinedSequential Optimization Strategy
顺序优化策略
Combine optimizers for best results:
python
undefined组合多个优化器以获得最佳结果:
python
undefinedStep 1: Bootstrap few-shot examples (fast)
步骤1:引导少样本示例(快速)
bootstrap = dspy.BootstrapFewShot(metric=accuracy_metric)
bootstrapped_program = bootstrap.compile(qa_module, trainset=train_examples)
bootstrap = dspy.BootstrapFewShot(metric=accuracy_metric)
bootstrapped_program = bootstrap.compile(qa_module, trainset=train_examples)
Step 2: Optimize instructions with MIPRO (comprehensive)
步骤2:使用MIPRO优化指令(全面)
mipro = dspy.MIPROv2(metric=accuracy_metric, auto="medium")
final_program = mipro.compile(
bootstrapped_program,
trainset=train_examples,
num_trials=50
)
mipro = dspy.MIPROv2(metric=accuracy_metric, auto="medium")
final_program = mipro.compile(
bootstrapped_program,
trainset=train_examples,
num_trials=50
)
Step 3: Fine-tune signature descriptions
步骤3:微调签名描述
sig_optimizer = dspy.SignatureOptimizer(metric=accuracy_metric)
production_program = sig_optimizer.compile(final_program, trainset=train_examples)
sig_optimizer = dspy.SignatureOptimizer(metric=accuracy_metric)
production_program = sig_optimizer.compile(final_program, trainset=train_examples)
Save production model
保存生产级模型
production_program.save("production_optimized.json")
undefinedproduction_program.save("production_optimized.json")
undefinedTeleprompters: Compilation Pipelines
Teleprompters:编译流水线
Teleprompters orchestrate the optimization process (legacy term for "optimizers").
Custom Teleprompter:
python
class CustomTeleprompter:
"""Custom optimization pipeline."""
def __init__(self, metric):
self.metric = metric
def compile(self, student, trainset, valset=None):
# Stage 1: Bootstrap examples
bootstrap = BootstrapFewShot(metric=self.metric)
stage1 = bootstrap.compile(student, trainset=trainset)
# Stage 2: Optimize instructions
mipro = MIPROv2(metric=self.metric, auto="light")
stage2 = mipro.compile(stage1, trainset=trainset)
# Stage 3: Validate on held-out set
if valset:
score = self._evaluate(stage2, valset)
print(f"Validation score: {score:.2%}")
return stage2
def _evaluate(self, program, dataset):
correct = 0
for example in dataset:
prediction = program(**example.inputs())
if self.metric(example, prediction):
correct += 1
return correct / len(dataset)Teleprompters(提示词生成器)协调优化过程(“优化器”的旧称)。
自定义Teleprompter:
python
class CustomTeleprompter:
"""自定义优化流水线。"""
def __init__(self, metric):
self.metric = metric
def compile(self, student, trainset, valset=None):
# 阶段1:引导示例
bootstrap = BootstrapFewShot(metric=self.metric)
stage1 = bootstrap.compile(student, trainset=trainset)
# 阶段2:优化指令
mipro = MIPROv2(metric=self.metric, auto="light")
stage2 = mipro.compile(stage1, trainset=trainset)
# 阶段3:在预留集上验证
if valset:
score = self._evaluate(stage2, valset)
print(f"验证得分: {score:.2%}")
return stage2
def _evaluate(self, program, dataset):
correct = 0
for example in dataset:
prediction = program(**example.inputs())
if self.metric(example, prediction):
correct += 1
return correct / len(dataset)Use custom teleprompter
使用自定义Teleprompter
custom_optimizer = CustomTeleprompter(metric=accuracy_metric)
optimized = custom_optimizer.compile(
student=qa_module,
trainset=train_examples,
valset=val_examples
)
undefinedcustom_optimizer = CustomTeleprompter(metric=accuracy_metric)
optimized = custom_optimizer.compile(
student=qa_module,
trainset=train_examples,
valset=val_examples
)
undefinedMetrics and Evaluation
指标与评估
Custom Metrics
自定义指标
Binary Accuracy:
python
def exact_match(example, prediction, trace=None):
"""Exact match metric."""
return example.answer.lower().strip() == prediction.answer.lower().strip()Fuzzy Matching:
python
from difflib import SequenceMatcher
def fuzzy_match(example, prediction, trace=None):
"""Fuzzy string matching."""
similarity = SequenceMatcher(
None,
example.answer.lower(),
prediction.answer.lower()
).ratio()
return similarity > 0.8 # 80% similarity thresholdMulti-Criteria:
python
def comprehensive_metric(example, prediction, trace=None):
"""Evaluate on multiple criteria."""
# Correctness
correct = example.answer.lower() in prediction.answer.lower()
# Length appropriateness
length_ok = 10 < len(prediction.answer) < 200
# Has reasoning (if CoT)
has_reasoning = (
hasattr(prediction, 'rationale') and
len(prediction.rationale) > 30
)
# Citation quality (if RAG)
has_citations = (
hasattr(prediction, 'context') and
len(prediction.context) > 0
)
# Composite score
score = sum([
correct * 1.0,
length_ok * 0.2,
has_reasoning * 0.3,
has_citations * 0.2
]) / 1.7
return scoreLLM-as-Judge:
python
def llm_judge_metric(example, prediction, trace=None):
"""Use LLM to evaluate quality."""
judge_prompt = f"""
Question: {example.question}
Expected Answer: {example.answer}
Predicted Answer: {prediction.answer}
Evaluate the predicted answer on a scale of 0-10 for:
1. Correctness
2. Completeness
3. Clarity
Return only a number 0-10.
"""
judge_lm = dspy.LM('openai/gpt-4o-mini')
response = judge_lm(judge_prompt)
score = float(response.strip()) / 10.0
return score > 0.7 # Pass if score > 7/10二元准确率:
python
def exact_match(example, prediction, trace=None):
"""精确匹配指标。"""
return example.answer.lower().strip() == prediction.answer.lower().strip()模糊匹配:
python
from difflib import SequenceMatcher
def fuzzy_match(example, prediction, trace=None):
"""模糊字符串匹配。"""
similarity = SequenceMatcher(
None,
example.answer.lower(),
prediction.answer.lower()
).ratio()
return similarity > 0.8 # 80%相似度阈值多标准指标:
python
def comprehensive_metric(example, prediction, trace=None):
"""基于多标准评估。"""
# 正确性
correct = example.answer.lower() in prediction.answer.lower()
# 长度合理性
length_ok = 10 < len(prediction.answer) < 200
# 是否包含推理过程(如果是CoT)
has_reasoning = (
hasattr(prediction, 'rationale') and
len(prediction.rationale) > 30
)
# 引用质量(如果是RAG)
has_citations = (
hasattr(prediction, 'context') and
len(prediction.context) > 0
)
# 综合得分
score = sum([
correct * 1.0,
length_ok * 0.2,
has_reasoning * 0.3,
has_citations * 0.2
]) / 1.7
return scoreLLM作为评判者:
python
def llm_judge_metric(example, prediction, trace=None):
"""使用LLM评估质量。"""
judge_prompt = f"""
问题: {example.question}
预期答案: {example.answer}
预测答案: {prediction.answer}
从以下维度对预测答案进行0-10分的评分:
1. 正确性
2. 完整性
3. 清晰度
仅返回0-10之间的数字。
"""
judge_lm = dspy.LM('openai/gpt-4o-mini')
response = judge_lm(judge_prompt)
score = float(response.strip()) / 10.0
return score > 0.7 # 得分>7/10则通过Evaluation Pipeline
评估流水线
python
class Evaluator:
"""Comprehensive evaluation system."""
def __init__(self, program, metrics):
self.program = program
self.metrics = metrics
def evaluate(self, dataset, verbose=True):
"""Evaluate program on dataset."""
results = {name: [] for name in self.metrics.keys()}
for example in dataset:
prediction = self.program(**example.inputs())
for metric_name, metric_fn in self.metrics.items():
score = metric_fn(example, prediction)
results[metric_name].append(score)
# Aggregate results
aggregated = {
name: sum(scores) / len(scores)
for name, scores in results.items()
}
if verbose:
print("\nEvaluation Results:")
print("=" * 50)
for name, score in aggregated.items():
print(f"{name:20s}: {score:.2%}")
return aggregatedpython
class Evaluator:
"""综合评估系统。"""
def __init__(self, program, metrics):
self.program = program
self.metrics = metrics
def evaluate(self, dataset, verbose=True):
"""在数据集上评估程序。"""
results = {name: [] for name in self.metrics.keys()}
for example in dataset:
prediction = self.program(**example.inputs())
for metric_name, metric_fn in self.metrics.items():
score = metric_fn(example, prediction)
results[metric_name].append(score)
# 聚合结果
aggregated = {
name: sum(scores) / len(scores)
for name, scores in results.items()
}
if verbose:
print("\n评估结果:")
print("=" * 50)
for name, score in aggregated.items():
print(f"{name:20s}: {score:.2%}")
return aggregatedUse evaluator
使用评估器
evaluator = Evaluator(
program=optimized_qa,
metrics={
"accuracy": exact_match,
"fuzzy_match": fuzzy_match,
"quality": comprehensive_metric
}
)
scores = evaluator.evaluate(test_dataset)
undefinedevaluator = Evaluator(
program=optimized_qa,
metrics={
"accuracy": exact_match,
"fuzzy_match": fuzzy_match,
"quality": comprehensive_metric
}
)
scores = evaluator.evaluate(test_dataset)
undefinedLanguage Model Configuration
语言模型配置
Supported Providers
支持的提供商
OpenAI:
python
import dspy
lm = dspy.LM('openai/gpt-4o', api_key='YOUR_API_KEY')
dspy.configure(lm=lm)OpenAI:
python
import dspy
lm = dspy.LM('openai/gpt-4o', api_key='YOUR_API_KEY')
dspy.configure(lm=lm)With custom settings
自定义设置
lm = dspy.LM(
'openai/gpt-4o-mini',
api_key='YOUR_API_KEY',
temperature=0.7,
max_tokens=1024
)
**Anthropic Claude**:
```python
lm = dspy.LM(
'anthropic/claude-3-5-sonnet-20241022',
api_key='YOUR_ANTHROPIC_KEY',
max_tokens=4096
)
dspy.configure(lm=lm)lm = dspy.LM(
'openai/gpt-4o-mini',
api_key='YOUR_API_KEY',
temperature=0.7,
max_tokens=1024
)
**Anthropic Claude**:
```python
lm = dspy.LM(
'anthropic/claude-3-5-sonnet-20241022',
api_key='YOUR_ANTHROPIC_KEY',
max_tokens=4096
)
dspy.configure(lm=lm)Claude Opus for complex reasoning
用于复杂推理的Claude Opus
lm_opus = dspy.LM('anthropic/claude-3-opus-20240229', api_key=key)
**Local Models (Ollama)**:
```pythonlm_opus = dspy.LM('anthropic/claude-3-opus-20240229', api_key=key)
**本地模型(Ollama)**:
```pythonRequires Ollama running locally
需要本地运行Ollama
lm = dspy.LM('ollama/llama3.1:70b', api_base='http://localhost:11434')
dspy.configure(lm=lm)
lm = dspy.LM('ollama/llama3.1:70b', api_base='http://localhost:11434')
dspy.configure(lm=lm)
Mixtral
Mixtral
lm = dspy.LM('ollama/mixtral:8x7b')
**Multiple Models**:
```pythonlm = dspy.LM('ollama/mixtral:8x7b')
**多模型配置**:
```pythonUse different models for different stages
为不同阶段使用不同模型
strong_lm = dspy.LM('openai/gpt-4o')
fast_lm = dspy.LM('openai/gpt-4o-mini')
strong_lm = dspy.LM('openai/gpt-4o')
fast_lm = dspy.LM('openai/gpt-4o-mini')
Configure per module
为每个模块单独配置
class HybridPipeline(dspy.Module):
def init(self):
super().init()
# Fast model for retrieval
self.retrieve = dspy.Retrieve(k=5)
# Strong model for reasoning
with dspy.context(lm=strong_lm):
self.reason = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
context = self.retrieve(question).passages
return self.reason(context=context, question=question)undefinedclass HybridPipeline(dspy.Module):
def init(self):
super().init()
# 快速模型用于检索
self.retrieve = dspy.Retrieve(k=5)
# 高性能模型用于推理
with dspy.context(lm=strong_lm):
self.reason = dspy.ChainOfThought("context, question -> answer")
def forward(self, question):
context = self.retrieve(question).passages
return self.reason(context=context, question=question)undefinedModel Selection Strategy
模型选择策略
python
def select_model(task_complexity, budget):
"""Select appropriate model based on task and budget."""
models = {
"simple": [
("openai/gpt-4o-mini", 0.15), # (model, cost per 1M tokens)
("anthropic/claude-3-haiku-20240307", 0.25),
],
"medium": [
("openai/gpt-4o", 2.50),
("anthropic/claude-3-5-sonnet-20241022", 3.00),
],
"complex": [
("anthropic/claude-3-opus-20240229", 15.00),
("openai/o1-preview", 15.00),
]
}
candidates = models[task_complexity]
affordable = [m for m, cost in candidates if cost <= budget]
return affordable[0] if affordable else candidates[0][0]python
def select_model(task_complexity, budget):
"""根据任务和预算选择合适的模型。"""
models = {
"simple": [
("openai/gpt-4o-mini", 0.15), # (模型, 每1M令牌成本)
("anthropic/claude-3-haiku-20240307", 0.25),
],
"medium": [
("openai/gpt-4o", 2.50),
("anthropic/claude-3-5-sonnet-20241022", 3.00),
],
"complex": [
("anthropic/claude-3-opus-20240229", 15.00),
("openai/o1-preview", 15.00),
]
}
candidates = models[task_complexity]
affordable = [m for m, cost in candidates if cost <= budget]
return affordable[0] if affordable else candidates[0][0]Use in optimization
在优化中使用
task = "complex"
model = select_model(task, budget=10.0)
lm = dspy.LM(model)
dspy.configure(lm=lm)
undefinedtask = "complex"
model = select_model(task, budget=10.0)
lm = dspy.LM(model)
dspy.configure(lm=lm)
undefinedProgram Composition
程序组合
Chaining Modules
模块链式调用
python
class MultiStepPipeline(dspy.Module):
"""Chain multiple reasoning steps."""
def __init__(self):
super().__init__()
self.step1 = dspy.ChainOfThought("question -> subtasks")
self.step2 = dspy.ChainOfThought("subtask -> solution")
self.step3 = dspy.ChainOfThought("solutions -> final_answer")
def forward(self, question):
# Break down question
decomposition = self.step1(question=question)
# Solve each subtask
solutions = []
for subtask in decomposition.subtasks.split('\n'):
if subtask.strip():
sol = self.step2(subtask=subtask)
solutions.append(sol.solution)
# Synthesize final answer
combined = '\n'.join(solutions)
final = self.step3(solutions=combined)
return dspy.Prediction(
answer=final.final_answer,
subtasks=decomposition.subtasks,
solutions=solutions
)python
class MultiStepPipeline(dspy.Module):
"""链式调用多个推理步骤。"""
def __init__(self):
super().__init__()
self.step1 = dspy.ChainOfThought("question -> subtasks")
self.step2 = dspy.ChainOfThought("subtask -> solution")
self.step3 = dspy.ChainOfThought("solutions -> final_answer")
def forward(self, question):
# 拆解问题
decomposition = self.step1(question=question)
# 解决每个子任务
solutions = []
for subtask in decomposition.subtasks.split('\n'):
if subtask.strip():
sol = self.step2(subtask=subtask)
solutions.append(sol.solution)
# 合成最终答案
combined = '\n'.join(solutions)
final = self.step3(solutions=combined)
return dspy.Prediction(
answer=final.final_answer,
subtasks=decomposition.subtasks,
solutions=solutions
)Optimize entire pipeline
优化整个流水线
pipeline = MultiStepPipeline()
optimizer = MIPROv2(metric=quality_metric, auto="medium")
optimized_pipeline = optimizer.compile(pipeline, trainset=examples)
undefinedpipeline = MultiStepPipeline()
optimizer = MIPROv2(metric=quality_metric, auto="medium")
optimized_pipeline = optimizer.compile(pipeline, trainset=examples)
undefinedConditional Branching
条件分支
python
class AdaptivePipeline(dspy.Module):
"""Adapt reasoning based on query type."""
def __init__(self):
super().__init__()
self.classifier = dspy.ChainOfThought("question -> category")
self.math_solver = dspy.ProgramOfThought("problem -> solution")
self.fact_qa = dspy.ChainOfThought("question -> answer")
self.creative = dspy.ChainOfThought("prompt -> response")
def forward(self, question):
# Classify query type
category = self.classifier(question=question).category.lower()
# Route to appropriate module
if "math" in category or "calculation" in category:
return self.math_solver(problem=question)
elif "creative" in category or "story" in category:
return self.creative(prompt=question)
else:
return self.fact_qa(question=question)python
class AdaptivePipeline(dspy.Module):
"""根据查询类型调整推理方式。"""
def __init__(self):
super().__init__()
self.classifier = dspy.ChainOfThought("question -> category")
self.math_solver = dspy.ProgramOfThought("problem -> solution")
self.fact_qa = dspy.ChainOfThought("question -> answer")
self.creative = dspy.ChainOfThought("prompt -> response")
def forward(self, question):
# 分类查询类型
category = self.classifier(question=question).category.lower()
# 路由到合适的模块
if "math" in category or "calculation" in category:
return self.math_solver(problem=question)
elif "creative" in category or "story" in category:
return self.creative(prompt=question)
else:
return self.fact_qa(question=question)Optimize each branch independently
独立优化每个分支
adaptive = AdaptivePipeline()
optimized_adaptive = optimizer.compile(adaptive, trainset=diverse_examples)
undefinedadaptive = AdaptivePipeline()
optimized_adaptive = optimizer.compile(adaptive, trainset=diverse_examples)
undefinedProduction Deployment
生产环境部署
Saving and Loading Models
模型的保存与加载
python
undefinedpython
undefinedSave optimized program
保存优化后的程序
optimized_program.save("models/qa_v1.0.0.json")
optimized_program.save("models/qa_v1.0.0.json")
Load in production
在生产环境中加载
production_qa = dspy.ChainOfThought("question -> answer")
production_qa.load("models/qa_v1.0.0.json")
production_qa = dspy.ChainOfThought("question -> answer")
production_qa.load("models/qa_v1.0.0.json")
Use loaded model
使用加载后的模型
response = production_qa(question="What is quantum computing?")
undefinedresponse = production_qa(question="什么是量子计算?")
undefinedVersion Control
版本控制
python
import json
from datetime import datetime
class ModelRegistry:
"""Version control for DSPy models."""
def __init__(self, registry_path="models/registry.json"):
self.registry_path = registry_path
self.registry = self._load_registry()
def register(self, name, version, model_path, metadata=None):
"""Register a model version."""
model_id = f"{name}:v{version}"
self.registry[model_id] = {
"name": name,
"version": version,
"path": model_path,
"created_at": datetime.utcnow().isoformat(),
"metadata": metadata or {}
}
self._save_registry()
return model_id
def get_model(self, name, version="latest"):
"""Load model by name and version."""
if version == "latest":
versions = [
v for k, v in self.registry.items()
if v["name"] == name
]
if not versions:
raise ValueError(f"No versions found for {name}")
latest = max(versions, key=lambda x: x["created_at"])
model_path = latest["path"]
else:
model_id = f"{name}:v{version}"
model_path = self.registry[model_id]["path"]
# Load model
module = dspy.ChainOfThought("question -> answer")
module.load(model_path)
return module
def _load_registry(self):
try:
with open(self.registry_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {}
def _save_registry(self):
with open(self.registry_path, 'w') as f:
json.dump(self.registry, f, indent=2)python
import json
from datetime import datetime
class ModelRegistry:
"""DSPy模型的版本控制。"""
def __init__(self, registry_path="models/registry.json"):
self.registry_path = registry_path
self.registry = self._load_registry()
def register(self, name, version, model_path, metadata=None):
"""注册模型版本。"""
model_id = f"{name}:v{version}"
self.registry[model_id] = {
"name": name,
"version": version,
"path": model_path,
"created_at": datetime.utcnow().isoformat(),
"metadata": metadata or {}
}
self._save_registry()
return model_id
def get_model(self, name, version="latest"):
"""按名称和版本加载模型。"""
if version == "latest":
versions = [
v for k, v in self.registry.items()
if v["name"] == name
]
if not versions:
raise ValueError(f"未找到{name}的任何版本")
latest = max(versions, key=lambda x: x["created_at"])
model_path = latest["path"]
else:
model_id = f"{name}:v{version}"
model_path = self.registry[model_id]["path"]
# 加载模型
module = dspy.ChainOfThought("question -> answer")
module.load(model_path)
return module
def _load_registry(self):
try:
with open(self.registry_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {}
def _save_registry(self):
with open(self.registry_path, 'w') as f:
json.dump(self.registry, f, indent=2)Use registry
使用注册表
registry = ModelRegistry()
registry = ModelRegistry()
Register new version
注册新版本
registry.register(
name="qa_assistant",
version="1.0.0",
model_path="models/qa_v1.0.0.json",
metadata={
"accuracy": 0.87,
"optimizer": "MIPROv2",
"training_examples": 500
}
)
registry.register(
name="qa_assistant",
version="1.0.0",
model_path="models/qa_v1.0.0.json",
metadata={
"accuracy": 0.87,
"optimizer": "MIPROv2",
"training_examples": 500
}
)
Load for production
加载生产级模型
qa = registry.get_model("qa_assistant", version="latest")
undefinedqa = registry.get_model("qa_assistant", version="latest")
undefinedMonitoring and Logging
监控与日志
python
import logging
from datetime import datetime
class DSPyMonitor:
"""Monitor DSPy program execution."""
def __init__(self, program, log_file="logs/dspy.log"):
self.program = program
self.logger = self._setup_logger(log_file)
self.metrics = []
def __call__(self, **kwargs):
"""Wrap program execution with monitoring."""
start_time = datetime.utcnow()
try:
# Execute program
result = self.program(**kwargs)
# Log success
duration = (datetime.utcnow() - start_time).total_seconds()
self._log_execution(
status="success",
inputs=kwargs,
outputs=result,
duration=duration
)
return result
except Exception as e:
# Log error
duration = (datetime.utcnow() - start_time).total_seconds()
self._log_execution(
status="error",
inputs=kwargs,
error=str(e),
duration=duration
)
raise
def _log_execution(self, status, inputs, duration, outputs=None, error=None):
"""Log execution details."""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"status": status,
"inputs": inputs,
"duration_seconds": duration
}
if outputs:
log_entry["outputs"] = str(outputs)
if error:
log_entry["error"] = error
self.logger.info(json.dumps(log_entry))
self.metrics.append(log_entry)
def _setup_logger(self, log_file):
"""Setup logging."""
logger = logging.getLogger("dspy_monitor")
logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
handler.setFormatter(
logging.Formatter('%(asctime)s - %(message)s')
)
logger.addHandler(handler)
return logger
def get_stats(self):
"""Get execution statistics."""
if not self.metrics:
return {}
successes = [m for m in self.metrics if m["status"] == "success"]
errors = [m for m in self.metrics if m["status"] == "error"]
return {
"total_calls": len(self.metrics),
"success_rate": len(successes) / len(self.metrics),
"error_rate": len(errors) / len(self.metrics),
"avg_duration": sum(m["duration_seconds"] for m in self.metrics) / len(self.metrics),
"errors": [m["error"] for m in errors]
}python
import logging
from datetime import datetime
class DSPyMonitor:
"""监控DSPy程序执行。"""
def __init__(self, program, log_file="logs/dspy.log"):
self.program = program
self.logger = self._setup_logger(log_file)
self.metrics = []
def __call__(self, **kwargs):
"""用监控包装程序执行。"""
start_time = datetime.utcnow()
try:
# 执行程序
result = self.program(**kwargs)
# 记录成功
duration = (datetime.utcnow() - start_time).total_seconds()
self._log_execution(
status="success",
inputs=kwargs,
outputs=result,
duration=duration
)
return result
except Exception as e:
# 记录错误
duration = (datetime.utcnow() - start_time).total_seconds()
self._log_execution(
status="error",
inputs=kwargs,
error=str(e),
duration=duration
)
raise
def _log_execution(self, status, inputs, duration, outputs=None, error=None):
"""记录执行细节。"""
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"status": status,
"inputs": inputs,
"duration_seconds": duration
}
if outputs:
log_entry["outputs"] = str(outputs)
if error:
log_entry["error"] = error
self.logger.info(json.dumps(log_entry))
self.metrics.append(log_entry)
def _setup_logger(self, log_file):
"""配置日志。"""
logger = logging.getLogger("dspy_monitor")
logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
handler.setFormatter(
logging.Formatter('%(asctime)s - %(message)s')
)
logger.addHandler(handler)
return logger
def get_stats(self):
"""获取执行统计数据。"""
if not self.metrics:
return {}
successes = [m for m in self.metrics if m["status"] == "success"]
errors = [m for m in self.metrics if m["status"] == "error"]
return {
"total_calls": len(self.metrics),
"success_rate": len(successes) / len(self.metrics),
"error_rate": len(errors) / len(self.metrics),
"avg_duration": sum(m["duration_seconds"] for m in self.metrics) / len(self.metrics),
"errors": [m["error"] for m in errors]
}Use monitor
使用监控器
monitored_qa = DSPyMonitor(optimized_qa)
result = monitored_qa(question="What is AI?")
monitored_qa = DSPyMonitor(optimized_qa)
result = monitored_qa(question="什么是AI?")
Check stats
查看统计数据
stats = monitored_qa.get_stats()
print(f"Success rate: {stats['success_rate']:.2%}")
undefinedstats = monitored_qa.get_stats()
print(f"成功率: {stats['success_rate']:.2%}")
undefinedIntegration with LangSmith
与LangSmith集成
Evaluate DSPy programs using LangSmith:
python
import os
from langsmith import Client
from langsmith.evaluation import evaluate使用LangSmith评估DSPy程序:
python
import os
from langsmith import Client
from langsmith.evaluation import evaluateSetup
配置
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-key"
client = Client()
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-key"
client = Client()
Wrap DSPy program for LangSmith
为LangSmith包装DSPy程序
def dspy_wrapper(inputs: dict) -> dict:
"""Wrapper for LangSmith evaluation."""
question = inputs["question"]
result = optimized_qa(question=question)
return {"answer": result.answer}
def dspy_wrapper(inputs: dict) -> dict:
"""LangSmith评估的包装器。"""
question = inputs["question"]
result = optimized_qa(question=question)
return {"answer": result.answer}
Define evaluator
定义评估器
def dspy_evaluator(run, example):
"""Evaluate DSPy output."""
predicted = run.outputs["answer"]
expected = example.outputs["answer"]
return {
"key": "correctness",
"score": 1.0 if expected.lower() in predicted.lower() else 0.0
}def dspy_evaluator(run, example):
"""评估DSPy输出。"""
predicted = run.outputs["answer"]
expected = example.outputs["answer"]
return {
"key": "correctness",
"score": 1.0 if expected.lower() in predicted.lower() else 0.0
}Create dataset
创建数据集
dataset = client.create_dataset(
dataset_name="dspy_qa_eval",
description="DSPy QA evaluation dataset"
)
dataset = client.create_dataset(
dataset_name="dspy_qa_eval",
description="DSPy QA评估数据集"
)
Add examples
添加示例
for example in test_examples:
client.create_example(
dataset_id=dataset.id,
inputs={"question": example.question},
outputs={"answer": example.answer}
)
for example in test_examples:
client.create_example(
dataset_id=dataset.id,
inputs={"question": example.question},
outputs={"answer": example.answer}
)
Run evaluation
运行评估
results = evaluate(
dspy_wrapper,
data="dspy_qa_eval",
evaluators=[dspy_evaluator],
experiment_prefix="dspy_v1.0"
)
print(f"Average correctness: {results['results']['correctness']:.2%}")
undefinedresults = evaluate(
dspy_wrapper,
data="dspy_qa_eval",
evaluators=[dspy_evaluator],
experiment_prefix="dspy_v1.0"
)
print(f"平均正确率: {results['results']['correctness']:.2%}")
undefinedReal-World Examples
实际应用示例
RAG Pipeline
RAG流水线
python
class ProductionRAG(dspy.Module):
"""Production-ready RAG system."""
def __init__(self, k=5):
super().__init__()
self.retrieve = dspy.Retrieve(k=k)
# Multi-stage reasoning
self.rerank = dspy.ChainOfThought(
"question, passages -> relevant_passages"
)
self.generate = dspy.ChainOfThought(
"question, context -> answer, citations"
)
def forward(self, question):
# Retrieve candidate passages
candidates = self.retrieve(question).passages
# Rerank for relevance
reranked = self.rerank(
question=question,
passages="\n---\n".join(candidates)
)
# Generate answer with citations
result = self.generate(
question=question,
context=reranked.relevant_passages
)
return dspy.Prediction(
answer=result.answer,
citations=result.citations,
passages=candidates
)python
class ProductionRAG(dspy.Module):
"""生产级RAG系统。"""
def __init__(self, k=5):
super().__init__()
self.retrieve = dspy.Retrieve(k=k)
# 多阶段推理
self.rerank = dspy.ChainOfThought(
"question, passages -> relevant_passages"
)
self.generate = dspy.ChainOfThought(
"question, context -> answer, citations"
)
def forward(self, question):
# 检索候选段落
candidates = self.retrieve(question).passages
# 重新排序以提升相关性
reranked = self.rerank(
question=question,
passages="\n---\n".join(candidates)
)
# 生成带引用的答案
result = self.generate(
question=question,
context=reranked.relevant_passages
)
return dspy.Prediction(
answer=result.answer,
citations=result.citations,
passages=candidates
)Optimize RAG pipeline
优化RAG流水线
rag = ProductionRAG(k=10)
def rag_metric(example, prediction, trace=None):
"""Evaluate RAG quality."""
answer_correct = example.answer.lower() in prediction.answer.lower()
has_citations = len(prediction.citations) > 0
return answer_correct and has_citations
optimizer = MIPROv2(metric=rag_metric, auto="heavy")
optimized_rag = optimizer.compile(rag, trainset=rag_examples)
optimized_rag.save("models/rag_production.json")
undefinedrag = ProductionRAG(k=10)
def rag_metric(example, prediction, trace=None):
"""评估RAG质量。"""
answer_correct = example.answer.lower() in prediction.answer.lower()
has_citations = len(prediction.citations) > 0
return answer_correct and has_citations
optimizer = MIPROv2(metric=rag_metric, auto="heavy")
optimized_rag = optimizer.compile(rag, trainset=rag_examples)
optimized_rag.save("models/rag_production.json")
undefinedClassification
分类任务
python
class SentimentClassifier(dspy.Module):
"""Multi-class sentiment classification."""
def __init__(self, classes):
super().__init__()
self.classes = classes
class ClassificationSig(dspy.Signature):
text = dspy.InputField()
reasoning = dspy.OutputField(desc="step-by-step reasoning")
sentiment = dspy.OutputField(desc=f"one of: {', '.join(classes)}")
confidence = dspy.OutputField(desc="confidence score 0-1")
self.classify = dspy.ChainOfThought(ClassificationSig)
def forward(self, text):
result = self.classify(text=text)
# Validate output
if result.sentiment not in self.classes:
result.sentiment = "neutral" # Fallback
return resultpython
class SentimentClassifier(dspy.Module):
"""多分类情感分类器。"""
def __init__(self, classes):
super().__init__()
self.classes = classes
class ClassificationSig(dspy.Signature):
text = dspy.InputField()
reasoning = dspy.OutputField(desc="逐步推理过程")
sentiment = dspy.OutputField(desc=f"可选值: {', '.join(classes)}")
confidence = dspy.OutputField(desc="置信度分数0-1")
self.classify = dspy.ChainOfThought(ClassificationSig)
def forward(self, text):
result = self.classify(text=text)
# 验证输出
if result.sentiment not in self.classes:
result.sentiment = "neutral" # fallback
return resultTrain classifier
训练分类器
classes = ["positive", "negative", "neutral"]
classifier = SentimentClassifier(classes)
def classification_metric(example, prediction, trace=None):
return example.sentiment == prediction.sentiment
optimizer = BootstrapFewShot(metric=classification_metric)
optimized_classifier = optimizer.compile(
classifier,
trainset=sentiment_examples
)
classes = ["positive", "negative", "neutral"]
classifier = SentimentClassifier(classes)
def classification_metric(example, prediction, trace=None):
return example.sentiment == prediction.sentiment
optimizer = BootstrapFewShot(metric=classification_metric)
optimized_classifier = optimizer.compile(
classifier,
trainset=sentiment_examples
)
Use in production
在生产环境中使用
result = optimized_classifier(text="This product is amazing!")
print(f"Sentiment: {result.sentiment} ({result.confidence})")
undefinedresult = optimized_classifier(text="这个产品太棒了!")
print(f"情感: {result.sentiment} ({result.confidence})")
undefinedSummarization
摘要任务
python
class DocumentSummarizer(dspy.Module):
"""Hierarchical document summarization."""
def __init__(self):
super().__init__()
# Chunk-level summaries
self.chunk_summary = dspy.ChainOfThought(
"chunk -> summary"
)
# Document-level synthesis
self.final_summary = dspy.ChainOfThought(
"chunk_summaries -> final_summary, key_points"
)
def forward(self, document, chunk_size=1000):
# Split document into chunks
chunks = self._chunk_document(document, chunk_size)
# Summarize each chunk
chunk_summaries = []
for chunk in chunks:
summary = self.chunk_summary(chunk=chunk)
chunk_summaries.append(summary.summary)
# Synthesize final summary
combined = "\n---\n".join(chunk_summaries)
final = self.final_summary(chunk_summaries=combined)
return dspy.Prediction(
summary=final.final_summary,
key_points=final.key_points.split('\n'),
chunk_count=len(chunks)
)
def _chunk_document(self, document, chunk_size):
"""Split document into chunks."""
words = document.split()
chunks = []
for i in range(0, len(words), chunk_size):
chunk = ' '.join(words[i:i + chunk_size])
chunks.append(chunk)
return chunkspython
class DocumentSummarizer(dspy.Module):
"""分层文档摘要器。"""
def __init__(self):
super().__init__()
# 段落级摘要
self.chunk_summary = dspy.ChainOfThought(
"chunk -> summary"
)
# 文档级合成
self.final_summary = dspy.ChainOfThought(
"chunk_summaries -> final_summary, key_points"
)
def forward(self, document, chunk_size=1000):
# 将文档拆分为段落
chunks = self._chunk_document(document, chunk_size)
# 为每个段落生成摘要
chunk_summaries = []
for chunk in chunks:
summary = self.chunk_summary(chunk=chunk)
chunk_summaries.append(summary.summary)
# 合成最终摘要
combined = "\n---\n".join(chunk_summaries)
final = self.final_summary(chunk_summaries=combined)
return dspy.Prediction(
summary=final.final_summary,
key_points=final.key_points.split('\n'),
chunk_count=len(chunks)
)
def _chunk_document(self, document, chunk_size):
"""将文档拆分为段落。"""
words = document.split()
chunks = []
for i in range(0, len(words), chunk_size):
chunk = ' '.join(words[i:i + chunk_size])
chunks.append(chunk)
return chunksOptimize summarizer
优化摘要器
summarizer = DocumentSummarizer()
def summary_metric(example, prediction, trace=None):
# Check key points coverage
key_points_present = sum(
1 for kp in example.key_points
if kp.lower() in prediction.summary.lower()
)
coverage = key_points_present / len(example.key_points)
# Check length appropriateness
length_ok = 100 < len(prediction.summary) < 500
return coverage > 0.7 and length_okoptimizer = MIPROv2(metric=summary_metric, auto="medium")
optimized_summarizer = optimizer.compile(summarizer, trainset=summary_examples)
undefinedsummarizer = DocumentSummarizer()
def summary_metric(example, prediction, trace=None):
# 检查要点覆盖率
key_points_present = sum(
1 for kp in example.key_points
if kp.lower() in prediction.summary.lower()
)
coverage = key_points_present / len(example.key_points)
# 检查长度合理性
length_ok = 100 < len(prediction.summary) < 500
return coverage > 0.7 and length_okoptimizer = MIPROv2(metric=summary_metric, auto="medium")
optimized_summarizer = optimizer.compile(summarizer, trainset=summary_examples)
undefinedQuestion Answering
多跳问答
python
class MultiHopQA(dspy.Module):
"""Multi-hop question answering."""
def __init__(self):
super().__init__()
# Decompose complex questions
self.decompose = dspy.ChainOfThought(
"question -> subquestions"
)
# Answer subquestions with retrieval
self.retrieve = dspy.Retrieve(k=3)
self.answer_subq = dspy.ChainOfThought(
"subquestion, context -> answer"
)
# Synthesize final answer
self.synthesize = dspy.ChainOfThought(
"question, subanswers -> final_answer, reasoning"
)
def forward(self, question):
# Decompose into subquestions
decomp = self.decompose(question=question)
subquestions = [
sq.strip()
for sq in decomp.subquestions.split('\n')
if sq.strip()
]
# Answer each subquestion
subanswers = []
for subq in subquestions:
context = self.retrieve(subq).passages
answer = self.answer_subq(
subquestion=subq,
context="\n".join(context)
)
subanswers.append(answer.answer)
# Synthesize final answer
combined = "\n".join([
f"Q: {sq}\nA: {sa}"
for sq, sa in zip(subquestions, subanswers)
])
final = self.synthesize(
question=question,
subanswers=combined
)
return dspy.Prediction(
answer=final.final_answer,
reasoning=final.reasoning,
subquestions=subquestions,
subanswers=subanswers
)python
class MultiHopQA(dspy.Module):
"""多跳问答系统。"""
def __init__(self):
super().__init__()
# 拆解复杂问题
self.decompose = dspy.ChainOfThought(
"question -> subquestions"
)
# 结合检索回答子问题
self.retrieve = dspy.Retrieve(k=3)
self.answer_subq = dspy.ChainOfThought(
"subquestion, context -> answer"
)
# 合成最终答案
self.synthesize = dspy.ChainOfThought(
"question, subanswers -> final_answer, reasoning"
)
def forward(self, question):
# 拆解为子问题
decomp = self.decompose(question=question)
subquestions = [
sq.strip()
for sq in decomp.subquestions.split('\n')
if sq.strip()
]
# 回答每个子问题
subanswers = []
for subq in subquestions:
context = self.retrieve(subq).passages
answer = self.answer_subq(
subquestion=subq,
context="\n".join(context)
)
subanswers.append(answer.answer)
# 合成最终答案
combined = "\n".join([
f"Q: {sq}\nA: {sa}"
for sq, sa in zip(subquestions, subanswers)
])
final = self.synthesize(
question=question,
subanswers=combined
)
return dspy.Prediction(
answer=final.final_answer,
reasoning=final.reasoning,
subquestions=subquestions,
subanswers=subanswers
)Optimize multi-hop QA
优化多跳QA系统
multihop_qa = MultiHopQA()
def multihop_metric(example, prediction, trace=None):
# Check answer correctness
correct = example.answer.lower() in prediction.answer.lower()
# Check reasoning quality
has_reasoning = len(prediction.reasoning) > 50
# Check subquestion coverage
has_subquestions = len(prediction.subquestions) >= 2
return correct and has_reasoning and has_subquestionsoptimizer = MIPROv2(metric=multihop_metric, auto="heavy")
optimized_multihop = optimizer.compile(multihop_qa, trainset=multihop_examples)
undefinedmultihop_qa = MultiHopQA()
def multihop_metric(example, prediction, trace=None):
# 检查答案正确性
correct = example.answer.lower() in prediction.answer.lower()
# 检查推理质量
has_reasoning = len(prediction.reasoning) > 50
# 检查子问题覆盖率
has_subquestions = len(prediction.subquestions) >= 2
return correct and has_reasoning and has_subquestionsoptimizer = MIPROv2(metric=multihop_metric, auto="heavy")
optimized_multihop = optimizer.compile(multihop_qa, trainset=multihop_examples)
undefinedMigration from Manual Prompting
从手动提示词工程迁移
Before: Manual Prompting
之前:手动提示词工程
python
undefinedpython
undefinedManual prompt engineering
手动提示词工程
PROMPT = """
You are a helpful assistant. Answer questions accurately and concisely.
Examples:
Q: What is 2+2?
A: 4
Q: Capital of France?
A: Paris
Q: {question}
A: """
def manual_qa(question):
response = llm.invoke(PROMPT.format(question=question))
return response
undefinedPROMPT = """
你是一个乐于助人的助手。请准确、简洁地回答问题。
示例:
Q: 2+2等于多少?
A: 4
Q: 法国的首都是?
A: Paris
Q: {question}
A: """
def manual_qa(question):
response = llm.invoke(PROMPT.format(question=question))
return response
undefinedAfter: DSPy
之后:使用DSPy
python
undefinedpython
undefinedDSPy declarative approach
DSPy声明式方法
class QA(dspy.Signature):
"""Answer questions accurately and concisely."""
question = dspy.InputField()
answer = dspy.OutputField(desc="short factual answer")
qa = dspy.ChainOfThought(QA)
class QA(dspy.Signature):
"""准确、简洁地回答问题。"""
question = dspy.InputField()
answer = dspy.OutputField(desc="简短事实性答案")
qa = dspy.ChainOfThought(QA)
Optimize automatically
自动优化
optimizer = MIPROv2(metric=accuracy_metric, auto="medium")
optimized_qa = optimizer.compile(qa, trainset=examples)
def dspy_qa(question):
result = optimized_qa(question=question)
return result.answer
**Benefits**:
- Systematic optimization vs. manual trial-and-error
- Version control and reproducibility
- Automatic adaptation to new models
- Performance gains: +18-38% accuracyoptimizer = MIPROv2(metric=accuracy_metric, auto="medium")
optimized_qa = optimizer.compile(qa, trainset=examples)
def dspy_qa(question):
result = optimized_qa(question=question)
return result.answer
**优势**:
- 系统化优化 vs 手动试错
- 版本控制与可复现性
- 自动适配新模型
- 性能提升:准确率提升18-38%Best Practices
最佳实践
Data Preparation
数据准备
python
undefinedpython
undefinedCreate high-quality training examples
创建高质量训练示例
def prepare_training_data(raw_data):
"""Convert raw data to DSPy examples."""
examples = []
for item in raw_data:
example = dspy.Example(
question=item["question"],
answer=item["answer"],
context=item.get("context", "") # Optional fields
).with_inputs("question", "context") # Mark input fields
examples.append(example)
return examplesdef prepare_training_data(raw_data):
"""将原始数据转换为DSPy示例。"""
examples = []
for item in raw_data:
example = dspy.Example(
question=item["question"],
answer=item["answer"],
context=item.get("context", "") # 可选字段
).with_inputs("question", "context") # 标记输入字段
examples.append(example)
return examplesSplit data properly
合理拆分数据
def train_val_test_split(examples, train=0.7, val=0.15, test=0.15):
"""Split data for optimization and evaluation."""
import random
random.shuffle(examples)
n = len(examples)
train_end = int(n * train)
val_end = int(n * (train + val))
return {
"train": examples[:train_end],
"val": examples[train_end:val_end],
"test": examples[val_end:]
}def train_val_test_split(examples, train=0.7, val=0.15, test=0.15):
"""拆分数据用于优化与评估。"""
import random
random.shuffle(examples)
n = len(examples)
train_end = int(n * train)
val_end = int(n * (train + val))
return {
"train": examples[:train_end],
"val": examples[train_end:val_end],
"test": examples[val_end:]
}Use split data
使用拆分后的数据
data = train_val_test_split(all_examples)
optimized = optimizer.compile(
program,
trainset=data["train"],
valset=data["val"] # For hyperparameter tuning
)
data = train_val_test_split(all_examples)
optimized = optimizer.compile(
program,
trainset=data["train"],
valset=data["val"] # 用于超参数调优
)
Final evaluation on held-out test set
在预留测试集上进行最终评估
evaluator = Evaluator(optimized, metrics={"accuracy": accuracy_metric})
test_results = evaluator.evaluate(data["test"])
undefinedevaluator = Evaluator(optimized, metrics={"accuracy": accuracy_metric})
test_results = evaluator.evaluate(data["test"])
undefinedMetric Design
指标设计
python
undefinedpython
undefinedDesign metrics aligned with business goals
设计与业务目标对齐的指标
def business_aligned_metric(example, prediction, trace=None):
"""Metric aligned with business KPIs."""
# Core correctness (must have)
correct = example.answer.lower() in prediction.answer.lower()
if not correct:
return 0.0
# Business-specific criteria
is_concise = len(prediction.answer) < 100 # User preference
is_professional = not any(
word in prediction.answer.lower()
for word in ["um", "like", "maybe", "dunno"]
)
has_confidence = (
hasattr(prediction, 'confidence') and
float(prediction.confidence) > 0.7
)
# Weighted score
score = (
correct * 1.0 +
is_concise * 0.2 +
is_professional * 0.3 +
has_confidence * 0.2
) / 1.7
return scoreundefineddef business_aligned_metric(example, prediction, trace=None):
"""与业务KPI对齐的指标。"""
# 核心正确性(必须满足)
correct = example.answer.lower() in prediction.answer.lower()
if not correct:
return 0.0
# 业务特定标准
is_concise = len(prediction.answer) < 100 # 用户偏好
is_professional = not any(
word in prediction.answer.lower()
for word in ["um", "like", "maybe", "dunno"]
)
has_confidence = (
hasattr(prediction, 'confidence') and
float(prediction.confidence) > 0.7
)
# 加权得分
score = (
correct * 1.0 +
is_concise * 0.2 +
is_professional * 0.3 +
has_confidence * 0.2
) / 1.7
return scoreundefinedError Handling
错误处理
python
class RobustModule(dspy.Module):
"""Module with error handling."""
def __init__(self):
super().__init__()
self.qa = dspy.ChainOfThought("question -> answer")
def forward(self, question, max_retries=3):
"""Forward with retry logic."""
for attempt in range(max_retries):
try:
result = self.qa(question=question)
# Validate output
if self._validate_output(result):
return result
else:
logging.warning(f"Invalid output on attempt {attempt + 1}")
except Exception as e:
logging.error(f"Error on attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
raise
# Fallback
return dspy.Prediction(
answer="I'm unable to answer that question.",
confidence=0.0
)
def _validate_output(self, result):
"""Validate output quality."""
return (
hasattr(result, 'answer') and
len(result.answer) > 0 and
len(result.answer) < 1000
)python
class RobustModule(dspy.Module):
"""带错误处理的模块。"""
def __init__(self):
super().__init__()
self.qa = dspy.ChainOfThought("question -> answer")
def forward(self, question, max_retries=3):
"""带重试逻辑的前向传播。"""
for attempt in range(max_retries):
try:
result = self.qa(question=question)
# 验证输出
if self._validate_output(result):
return result
else:
logging.warning(f"第{attempt + 1}次尝试输出无效")
except Exception as e:
logging.error(f"第{attempt + 1}次尝试出错: {e}")
if attempt == max_retries - 1:
raise
# fallback
return dspy.Prediction(
answer="我无法回答这个问题。",
confidence=0.0
)
def _validate_output(self, result):
"""验证输出质量。"""
return (
hasattr(result, 'answer') and
len(result.answer) > 0 and
len(result.answer) < 1000
)Caching for Efficiency
缓存以提升效率
python
from functools import lru_cache
import hashlib
class CachedModule(dspy.Module):
"""Module with semantic caching."""
def __init__(self, base_module):
super().__init__()
self.base_module = base_module
self.cache = {}
def forward(self, question):
# Check cache
cache_key = self._get_cache_key(question)
if cache_key in self.cache:
logging.info("Cache hit")
return self.cache[cache_key]
# Cache miss: execute module
result = self.base_module(question=question)
self.cache[cache_key] = result
return result
def _get_cache_key(self, question):
"""Generate cache key."""
return hashlib.md5(question.lower().encode()).hexdigest()python
from functools import lru_cache
import hashlib
class CachedModule(dspy.Module):
"""带语义缓存的模块。"""
def __init__(self, base_module):
super().__init__()
self.base_module = base_module
self.cache = {}
def forward(self, question):
# 检查缓存
cache_key = self._get_cache_key(question)
if cache_key in self.cache:
logging.info("缓存命中")
return self.cache[cache_key]
# 缓存未命中:执行模块
result = self.base_module(question=question)
self.cache[cache_key] = result
return result
def _get_cache_key(self, question):
"""生成缓存键。"""
return hashlib.md5(question.lower().encode()).hexdigest()Use cached module
使用带缓存的模块
base_qa = dspy.ChainOfThought("question -> answer")
cached_qa = CachedModule(base_qa)
undefinedbase_qa = dspy.ChainOfThought("question -> answer")
cached_qa = CachedModule(base_qa)
undefinedTroubleshooting
故障排除
Common Issues
常见问题
Low Optimization Performance:
- Increase training data size (aim for 100+ examples)
- Use better quality metric (more specific)
- Try different optimizer (for MIPROv2)
auto="heavy" - Check for data leakage in metric
Optimization Takes Too Long:
- Use instead of
auto="light""heavy" - Reduce for MIPROv2
num_trials - Use BootstrapFewShot instead of MIPROv2 for quick iteration
- Parallelize with parameter
num_threads
Inconsistent Results:
- Set random seed:
dspy.configure(random_seed=42) - Increase temperature for diversity or decrease for consistency
- Use ensemble of multiple optimized programs
- Validate on larger test set
Out of Memory:
- Reduce batch size in optimization
- Use streaming for large datasets
- Clear cache periodically
- Use smaller model for bootstrapping
优化性能低下:
- 增加训练数据量(目标100+示例)
- 使用更精准的质量指标
- 尝试不同的优化器(MIPROv2使用)
auto="heavy" - 检查指标中是否存在数据泄露
优化耗时过长:
- 使用替代
auto="light""heavy" - 减少MIPROv2的
num_trials - 快速迭代时使用BootstrapFewShot替代MIPROv2
- 使用参数并行化
num_threads
结果不一致:
- 设置随机种子:
dspy.configure(random_seed=42) - 提高温度参数增加多样性,或降低参数提升一致性
- 使用多个优化程序的集成
- 在更大的测试集上验证
内存不足:
- 减少优化中的批处理大小
- 对大数据集使用流式处理
- 定期清理缓存
- 引导阶段使用更小的模型
Debugging Optimization
优化调试
python
undefinedpython
undefinedEnable verbose logging
启用详细日志
import logging
logging.basicConfig(level=logging.INFO)
import logging
logging.basicConfig(level=logging.INFO)
Custom teleprompter with debugging
带调试功能的自定义提示词生成器
class DebugTeleprompter:
def init(self, metric):
self.metric = metric
self.history = []
def compile(self, student, trainset):
print(f"\nStarting optimization with {len(trainset)} examples")
# Bootstrap with debugging
bootstrap = BootstrapFewShot(metric=self.metric)
for i, example in enumerate(trainset):
prediction = student(**example.inputs())
score = self.metric(example, prediction)
self.history.append({
"example_idx": i,
"score": score,
"prediction": str(prediction)
})
print(f"Example {i}: score={score}")
# Continue with optimization
optimized = bootstrap.compile(student, trainset=trainset)
print(f"\nOptimization complete")
print(f"Average score: {sum(h['score'] for h in self.history) / len(self.history):.2f}")
return optimizedclass DebugTeleprompter:
def init(self, metric):
self.metric = metric
self.history = []
def compile(self, student, trainset):
print(f"\n开始优化,使用{len(trainset)}个示例")
# 带调试的引导
bootstrap = BootstrapFewShot(metric=self.metric)
for i, example in enumerate(trainset):
prediction = student(**example.inputs())
score = self.metric(example, prediction)
self.history.append({
"example_idx": i,
"score": score,
"prediction": str(prediction)
})
print(f"示例{i}: 得分={score}")
# 继续优化
optimized = bootstrap.compile(student, trainset=trainset)
print(f"\n优化完成")
print(f"平均得分: {sum(h['score'] for h in self.history) / len(self.history):.2f}")
return optimizedUse debug teleprompter
使用调试提示词生成器
debug_optimizer = DebugTeleprompter(metric=accuracy_metric)
optimized = debug_optimizer.compile(qa_module, trainset=examples)
undefineddebug_optimizer = DebugTeleprompter(metric=accuracy_metric)
optimized = debug_optimizer.compile(qa_module, trainset=examples)
undefinedPerformance Benchmarks
性能基准
Based on 2025 production studies:
| Use Case | Baseline | DSPy Optimized | Improvement | Optimizer Used |
|---|---|---|---|---|
| Prompt Evaluation | 46.2% | 64.0% | +38.5% | MIPROv2 |
| Guardrail Enforcement | 72.1% | 84.3% | +16.9% | MIPROv2 |
| Code Generation | 58.4% | 71.2% | +21.9% | MIPROv2 |
| Hallucination Detection | 65.8% | 79.5% | +20.8% | BootstrapFewShot |
| Agent Routing | 69.3% | 82.1% | +18.5% | MIPROv2 |
| RAG Accuracy | 54.0% | 68.5% | +26.9% | BootstrapFewShot + MIPRO |
Production Adopters: JetBlue, Databricks, Walmart, VMware, Replit, Sephora, Moody's
基于2025年生产环境研究:
| 用场景 | 基线 | DSPy优化后 | 提升幅度 | 使用的优化器 |
|---|---|---|---|---|
| 提示词评估 | 46.2% | 64.0% | +38.5% | MIPROv2 |
| 护栏执行 | 72.1% | 84.3% | +16.9% | MIPROv2 |
| 代码生成 | 58.4% | 71.2% | +21.9% | MIPROv2 |
| 幻觉检测 | 65.8% | 79.5% | +20.8% | BootstrapFewShot |
| Agent路由 | 69.3% | 82.1% | +18.5% | MIPROv2 |
| RAG准确率 | 54.0% | 68.5% | +26.9% | BootstrapFewShot + MIPRO |
生产环境采用者:JetBlue、Databricks、Walmart、VMware、Replit、Sephora、Moody's
Resources
资源
- Documentation: https://dspy.ai/
- GitHub: https://github.com/stanfordnlp/dspy
- Paper: "DSPy: Compiling Declarative Language Model Calls into Self-Improving Pipelines"
- 2025 Study: "Is It Time To Treat Prompts As Code?" (arXiv:2507.03620)
- Community: Discord, GitHub Discussions
- 文档:https://dspy.ai/
- GitHub:https://github.com/stanfordnlp/dspy
- 论文:"DSPy: Compiling Declarative Language Model Calls into Self-Improving Pipelines"
- 2025年研究:"Is It Time To Treat Prompts As Code?" (arXiv:2507.03620)
- 社区:Discord、GitHub Discussions
Related Skills
相关技能
When using Dspy, these skills enhance your workflow:
- langgraph: LangGraph for multi-agent orchestration (use with DSPy-optimized prompts)
- test-driven-development: Testing DSPy modules and prompt optimizations
- systematic-debugging: Debugging DSPy compilation and optimization failures
[Full documentation available in these skills if deployed in your bundle]
使用Dspy时,以下技能可提升你的工作流:
- langgraph:用于多Agent编排的LangGraph(与DSPy优化后的提示词配合使用)
- test-driven-development:测试DSPy模块与提示词优化
- systematic-debugging:调试DSPy编译与优化失败问题
[完整文档可在你的技能包中查看(若已部署)]