Loading...
Loading...
Chain multiple AI steps into one reliable pipeline. Use when your AI task is too complex for one prompt, you need to break AI logic into stages, combine classification then generation, do multi-step reasoning, build a compound AI system, orchestrate multiple models, or wire AI components together. Powered by DSPy multi-module pipelines.
npx skill4agent add lebsral/dspy-programming-not-prompting-lms-skills ai-building-pipelinesforward()import dspy
class SupportPipeline(dspy.Module):
def __init__(self):
self.classify = dspy.ChainOfThought(ClassifyTicket)
self.retrieve = dspy.Retrieve(k=3)
self.draft = dspy.ChainOfThought(DraftResponse)
def forward(self, ticket):
# Stage 1: Classify
classification = self.classify(ticket=ticket)
# Stage 2: Retrieve relevant docs
docs = self.retrieve(classification.category + " " + ticket).passages
# Stage 3: Draft response using classification + docs
return self.draft(
ticket=ticket,
category=classification.category,
context=docs,
)from typing import Literal
CATEGORIES = ["billing", "technical", "account", "general"]
class ClassifyTicket(dspy.Signature):
"""Classify the support ticket."""
ticket: str = dspy.InputField()
category: Literal[tuple(CATEGORIES)] = dspy.OutputField()
class DraftResponse(dspy.Signature):
"""Draft a helpful response to the support ticket."""
ticket: str = dspy.InputField()
category: str = dspy.InputField()
context: list[str] = dspy.InputField(desc="Relevant help articles")
response: str = dspy.OutputField(desc="Professional support response")class RoutedPipeline(dspy.Module):
def __init__(self):
self.classify = dspy.ChainOfThought(ClassifyInput)
self.handlers = {
"simple": dspy.Predict(SimpleAnswer),
"complex": dspy.ChainOfThought(DetailedAnswer),
"research": dspy.ChainOfThought(ResearchAnswer),
}
def forward(self, question):
category = self.classify(question=question).category
handler = self.handlers.get(category, self.handlers["simple"])
return handler(question=question)class GenerateAndRefine(dspy.Module):
def __init__(self):
self.generate = dspy.ChainOfThought(GenerateDraft)
self.verify = dspy.ChainOfThought(CheckQuality)
self.refine = dspy.ChainOfThought(ImproveDraft)
def forward(self, task):
# Stage 1: Generate
draft = self.generate(task=task)
# Stage 2: Verify
check = self.verify(task=task, draft=draft.output)
# Stage 3: Refine if needed
if not check.is_good:
refined = self.refine(
task=task,
draft=draft.output,
feedback=check.feedback,
)
return refined
return draftclass EnsemblePipeline(dspy.Module):
def __init__(self, num_candidates=5):
self.generators = [dspy.ChainOfThought(GenerateAnswer) for _ in range(num_candidates)]
self.judge = dspy.ChainOfThought(PickBestAnswer)
def forward(self, question):
# Stage 1: Generate multiple candidates
candidates = []
for gen in self.generators:
result = gen(question=question)
candidates.append(result.answer)
# Stage 2: Pick the best
return self.judge(
question=question,
candidates=candidates,
)
class PickBestAnswer(dspy.Signature):
"""Pick the best answer from the candidates."""
question: str = dspy.InputField()
candidates: list[str] = dspy.InputField(desc="Multiple answer candidates")
best_answer: str = dspy.OutputField(desc="The most accurate and complete answer")
reasoning: str = dspy.OutputField(desc="Why this answer was chosen")class ParallelAnalysis(dspy.Module):
def __init__(self):
self.sentiment = dspy.ChainOfThought(AnalyzeSentiment)
self.topics = dspy.ChainOfThought(ExtractTopics)
self.entities = dspy.ChainOfThought(ExtractEntities)
self.summarize = dspy.ChainOfThought(CombineAnalysis)
def forward(self, text):
# Fan out — run in parallel (DSPy can parallelize these)
sent = self.sentiment(text=text)
topics = self.topics(text=text)
entities = self.entities(text=text)
# Merge results
return self.summarize(
text=text,
sentiment=sent.sentiment,
topics=topics.topics,
entities=entities.entities,
)class IterativeRefiner(dspy.Module):
def __init__(self, max_iterations=3):
self.generate = dspy.ChainOfThought(GenerateDraft)
self.evaluate = dspy.ChainOfThought(EvaluateDraft)
self.improve = dspy.ChainOfThought(ImproveDraft)
self.max_iterations = max_iterations
def forward(self, task):
draft = self.generate(task=task)
for i in range(self.max_iterations):
evaluation = self.evaluate(task=task, draft=draft.output)
if evaluation.score >= 0.9:
break
draft = self.improve(
task=task,
draft=draft.output,
feedback=evaluation.feedback,
)
return draftexpensive_lm = dspy.LM("openai/gpt-4o")
cheap_lm = dspy.LM("openai/gpt-4o-mini")
pipeline = SupportPipeline()
# Cheap model for classification (simple task)
pipeline.classify.lm = cheap_lm
# Expensive model for drafting (needs quality)
pipeline.draft.lm = expensive_lm/ai-cutting-costsdef pipeline_metric(example, prediction, trace=None):
# Score the final output quality
return prediction.response.lower().strip() == example.response.lower().strip()
# Optimizes prompts for ALL stages together
optimizer = dspy.MIPROv2(metric=pipeline_metric, auto="medium")
optimized = optimizer.compile(pipeline, trainset=trainset)forward()forwarddspy.inspect_history()| If your pipeline... | Use |
|---|---|
| Steps run in a fixed order | DSPy pipeline (this skill) |
| Steps branch based on results | DSPy pipeline with |
| Needs cycles (retry loops, agent loops) | LangGraph |
| Needs persistent state across calls | LangGraph with checkpointing |
| Needs human approval mid-pipeline | LangGraph |
| Coordinates multiple independent agents | LangGraph supervisor pattern |
import dspy
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class PipelineState(TypedDict):
input_text: str
category: str
output: str
# DSPy modules
classifier = dspy.ChainOfThought("text -> category")
generator = dspy.ChainOfThought("text, category -> output")
# Wrap as LangGraph nodes
def classify_node(state: PipelineState) -> dict:
result = classifier(text=state["input_text"])
return {"category": result.category}
def generate_node(state: PipelineState) -> dict:
result = generator(text=state["input_text"], category=state["category"])
return {"output": result.output}
# Build graph
graph = StateGraph(PipelineState)
graph.add_node("classify", classify_node)
graph.add_node("generate", generate_node)
graph.add_edge(START, "classify")
graph.add_edge("classify", "generate")
graph.add_edge("generate", END)
app = graph.compile()/ai-building-chatbots/ai-coordinating-agentsdocs/langchain-langgraph-reference.md/ai-checking-outputs/ai-cutting-costs/ai-decomposing-tasks/ai-writing-content/ai-reasoning/ai-improving-accuracy