ai-decomposing-tasks

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Decompose a Failing AI Task

拆解失效的AI任务

Guide the user through splitting a single unreliable AI step into multiple reliable subtasks. The insight: when a single prompt fails on complex inputs, restructuring the task — not just tweaking the prompt — is often the fix.
引导用户将单个不可靠的AI步骤拆分为多个可靠的子任务。核心思路:当单个提示词在复杂输入上失效时,重构任务(而非仅仅调整提示词)通常是解决办法。

Step 1: Diagnose why single-step fails

步骤1:诊断单步骤失效的原因

Ask the user:
  1. What's the task? (extraction, classification, generation, etc.)
  2. When does it work? (simple inputs, short text, single items)
  3. When does it fail? (long documents, many items, mixed formats)
询问用户:
  1. 任务类型是什么?(提取、分类、生成等)
  2. 何时能正常工作?(简单输入、短文本、单一内容)
  3. 何时会失效?(长文档、多内容、混合格式)

Common failure modes

常见失效模式

Look at the errors. They usually fall into one of these patterns:
Failure modeWhat you seeRoot cause
Missed itemsExtracts 3 of 7 line itemsInput overwhelms the context — too much to track at once
Conflated fieldsMixes up sender/recipient addressesMultiple similar things extracted simultaneously
Inconsistent resultsWorks on invoice A, fails on invoice BDifferent input formats need different handling
Degraded accuracy95% on short text, 60% on long textInput length exceeds what a single pass can reliably process
If the task works on simple inputs but fails on complex ones, decomposition is the right lever. If it fails on everything, try
/ai-improving-accuracy
first.
分析错误,通常属于以下模式之一:
失效模式表现根本原因
内容遗漏仅提取7个条目中的3个输入超出上下文承载能力——无法同时追踪过多内容
字段混淆发件人/收件人地址混为一谈同时提取多个相似内容
结果不一致在发票A上正常工作,在发票B上失效不同输入格式需要不同的处理方式
准确率下降短文本准确率95%,长文本仅60%输入长度超出单次处理的可靠承载范围
如果任务在简单输入上正常但在复杂输入上失效,那么拆解任务是正确的解决方案。如果所有输入都处理失败,请先尝试
/ai-improving-accuracy

Step 2: Choose a decomposition strategy

步骤2:选择拆解策略

Match the failure mode to a pattern:
What's going wrong?
|
+- Input is too long, AI loses focus
|  → Chunk-then-process (Step 3)
|
+- AI conflates multiple similar things
|  → Sequential extraction (Step 4)
|
+- AI misses items in variable-length lists
|  → Identify-then-process (Step 5)
|
+- Different input types need different handling
|  → Classify-then-specialize (see /ai-building-pipelines)
You can combine strategies. A long document with variable-length lists might need chunking and identify-then-process.
根据失效模式匹配对应策略:
问题是什么?
|
+- 输入过长,AI失去焦点
|  → 分块后处理(步骤3)
|
+- AI混淆多个相似内容
|  → 顺序提取(步骤4)
|
+- AI在可变长度列表中遗漏内容
|  → 识别后处理(步骤5)
|
+- 不同输入类型需要不同处理
|  → 分类后定制处理(参见/ai-building-pipelines)
你可以组合多种策略。包含可变长度列表的长文档可能需要同时使用分块和识别后处理策略。

Step 3: Chunk-then-process

步骤3:分块后处理

Split long input into overlapping chunks, process each, then deduplicate results.
When to use: Input exceeds what the model can reliably process in one pass. Typical signs: accuracy drops sharply as input length grows.
python
import dspy
from pydantic import BaseModel, Field

class ExtractedItem(BaseModel):
    name: str
    value: str
    source_text: str = Field(description="The exact text this was extracted from")

class ExtractFromChunk(dspy.Signature):
    """Extract all relevant items from this section of the document."""
    chunk: str = dspy.InputField(desc="A section of the document")
    items: list[ExtractedItem] = dspy.OutputField(desc="All items found in this section")

class ChunkAndExtract(dspy.Module):
    def __init__(self, chunk_size=2000, overlap=200):
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.extract = dspy.ChainOfThought(ExtractFromChunk)

    def _chunk_text(self, text: str) -> list[str]:
        """Split text into overlapping chunks at paragraph boundaries."""
        words = text.split()
        chunks = []
        start = 0
        while start < len(words):
            end = start + self.chunk_size
            chunk = " ".join(words[start:end])
            chunks.append(chunk)
            start = end - self.overlap
        return chunks

    def _deduplicate(self, all_items: list[ExtractedItem]) -> list[ExtractedItem]:
        """Remove duplicate extractions from overlapping chunks."""
        seen = set()
        unique = []
        for item in all_items:
            key = (item.name.lower().strip(), item.value.lower().strip())
            if key not in seen:
                seen.add(key)
                unique.append(item)
        return unique

    def forward(self, document: str):
        chunks = self._chunk_text(document)
        all_items = []

        for chunk in chunks:
            result = self.extract(chunk=chunk)
            all_items.extend(result.items)

        unique_items = self._deduplicate(all_items)
        return dspy.Prediction(items=unique_items)
Key details:
  • Overlap prevents items at chunk boundaries from being split and missed
  • Paragraph-aware splitting is better than raw character splitting — try to break at
    \n\n
    boundaries
  • Deduplication is essential because overlapping chunks will extract the same items twice
  • Include
    source_text
    in the output so you can trace extractions back to the document
将长输入拆分为重叠的块,逐个处理后对结果去重。
适用场景: 输入超出模型单次可靠处理的范围。典型表现:随着输入长度增加,准确率急剧下降。
python
import dspy
from pydantic import BaseModel, Field

class ExtractedItem(BaseModel):
    name: str
    value: str
    source_text: str = Field(description="The exact text this was extracted from")

class ExtractFromChunk(dspy.Signature):
    """Extract all relevant items from this section of the document."""
    chunk: str = dspy.InputField(desc="A section of the document")
    items: list[ExtractedItem] = dspy.OutputField(desc="All items found in this section")

class ChunkAndExtract(dspy.Module):
    def __init__(self, chunk_size=2000, overlap=200):
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.extract = dspy.ChainOfThought(ExtractFromChunk)

    def _chunk_text(self, text: str) -> list[str]:
        """Split text into overlapping chunks at paragraph boundaries."""
        words = text.split()
        chunks = []
        start = 0
        while start < len(words):
            end = start + self.chunk_size
            chunk = " ".join(words[start:end])
            chunks.append(chunk)
            start = end - self.overlap
        return chunks

    def _deduplicate(self, all_items: list[ExtractedItem]) -> list[ExtractedItem]:
        """Remove duplicate extractions from overlapping chunks."""
        seen = set()
        unique = []
        for item in all_items:
            key = (item.name.lower().strip(), item.value.lower().strip())
            if key not in seen:
                seen.add(key)
                unique.append(item)
        return unique

    def forward(self, document: str):
        chunks = self._chunk_text(document)
        all_items = []

        for chunk in chunks:
            result = self.extract(chunk=chunk)
            all_items.extend(result.items)

        unique_items = self._deduplicate(all_items)
        return dspy.Prediction(items=unique_items)
关键细节:
  • 重叠机制 防止块边界的内容被拆分和遗漏
  • 基于段落的拆分 优于纯字符拆分——尽量在
    \n\n
    边界处拆分
  • 去重 至关重要,因为重叠块会重复提取相同内容
  • 在输出中包含
    source_text
    ,以便将提取内容追溯到原文档

Step 4: Sequential extraction (the Salomatic pattern)

步骤4:顺序提取(Salomatic模式)

Extract one thing first, then use that result to constrain the next extraction. This is the pattern that took a medical report system from 40% error rate to near-zero.
When to use: The AI conflates multiple similar things, or extracting everything at once overwhelms it.
python
class IdentifyPanels(dspy.Signature):
    """Identify all lab test panels in the medical report."""
    report: str = dspy.InputField(desc="Medical lab report")
    panel_names: list[str] = dspy.OutputField(desc="Names of all test panels found")

class LabResult(BaseModel):
    test_name: str
    value: str
    unit: str
    reference_range: str
    flag: str = Field(description="'normal', 'high', or 'low'")

class ExtractPanelResults(dspy.Signature):
    """Extract all test results for a specific panel from the report."""
    report: str = dspy.InputField(desc="Medical lab report")
    panel_name: str = dspy.InputField(desc="The specific panel to extract results for")
    results: list[LabResult] = dspy.OutputField(desc="All test results for this panel")

class SequentialExtractor(dspy.Module):
    def __init__(self):
        self.identify = dspy.ChainOfThought(IdentifyPanels)
        self.extract = dspy.ChainOfThought(ExtractPanelResults)

    def forward(self, report: str):
        # Step 1: Identify what's in the report
        panels = self.identify(report=report)

        dspy.Assert(
            len(panels.panel_names) > 0,
            "No test panels found — is this a valid lab report?"
        )

        # Step 2: Extract results per panel
        all_results = {}
        for panel_name in panels.panel_names:
            result = self.extract(report=report, panel_name=panel_name)
            all_results[panel_name] = result.results

        return dspy.Prediction(
            panels=panels.panel_names,
            results=all_results,
        )
Why this works:
  • Step 1 is easy — just identify panel names (low cognitive load)
  • Step 2 is focused — extract results for one specific panel at a time
  • The model doesn't have to juggle "find all panels AND extract all results" simultaneously
  • Each extraction is scoped to a smaller, well-defined subtask
This same pattern applies beyond medical reports — any time you're extracting multiple groups of similar things (invoice sections, resume sections, contract clauses).
先提取一个内容,再用该结果约束下一次提取。这种模式将医疗报告系统的错误率从40%降至接近零。
适用场景: AI混淆多个相似内容,或者一次性提取所有内容使其过载。
python
class IdentifyPanels(dspy.Signature):
    """Identify all lab test panels in the medical report."""
    report: str = dspy.InputField(desc="Medical lab report")
    panel_names: list[str] = dspy.OutputField(desc="Names of all test panels found")

class LabResult(BaseModel):
    test_name: str
    value: str
    unit: str
    reference_range: str
    flag: str = Field(description="'normal', 'high', or 'low'")

class ExtractPanelResults(dspy.Signature):
    """Extract all test results for a specific panel from the report."""
    report: str = dspy.InputField(desc="Medical lab report")
    panel_name: str = dspy.InputField(desc="The specific panel to extract results for")
    results: list[LabResult] = dspy.OutputField(desc="All test results for this panel")

class SequentialExtractor(dspy.Module):
    def __init__(self):
        self.identify = dspy.ChainOfThought(IdentifyPanels)
        self.extract = dspy.ChainOfThought(ExtractPanelResults)

    def forward(self, report: str):
        # Step 1: Identify what's in the report
        panels = self.identify(report=report)

        dspy.Assert(
            len(panels.panel_names) > 0,
            "No test panels found — is this a valid lab report?"
        )

        # Step 2: Extract results per panel
        all_results = {}
        for panel_name in panels.panel_names:
            result = self.extract(report=report, panel_name=panel_name)
            all_results[panel_name] = result.results

        return dspy.Prediction(
            panels=panels.panel_names,
            results=all_results,
        )
为什么这种模式有效:
  • 步骤1简单——仅识别面板名称(认知负荷低)
  • 步骤2聚焦——一次仅提取一个特定面板的结果
  • 模型无需同时处理“找到所有面板并提取所有结果”的双重任务
  • 每次提取都限定在更小、定义明确的子任务中
这种模式同样适用于医疗报告之外的场景——任何需要提取多组相似内容的情况(发票 sections、简历 sections、合同条款)。

Step 5: Identify-then-process

步骤5:识别后处理

First count or name the items, then process each one individually. This prevents the "missed items" failure where the model extracts 3 of 7 items.
When to use: Variable-length lists where the model consistently misses items.
python
class IdentifyLineItems(dspy.Signature):
    """Identify all line items in the invoice. List every item, even small ones."""
    invoice_text: str = dspy.InputField(desc="Raw invoice text")
    item_descriptions: list[str] = dspy.OutputField(
        desc="Brief description of each line item, in order they appear"
    )

class LineItemDetail(BaseModel):
    description: str
    quantity: int
    unit_price: float
    total: float

class ExtractLineItem(dspy.Signature):
    """Extract the details for a specific line item from the invoice."""
    invoice_text: str = dspy.InputField(desc="Raw invoice text")
    item_description: str = dspy.InputField(desc="The specific item to extract details for")
    details: LineItemDetail = dspy.OutputField()

class IdentifyThenExtract(dspy.Module):
    def __init__(self):
        self.identify = dspy.ChainOfThought(IdentifyLineItems)
        self.extract_item = dspy.ChainOfThought(ExtractLineItem)

    def forward(self, invoice_text: str):
        # Step 1: Identify all items (just names — low cognitive load)
        items = self.identify(invoice_text=invoice_text)

        dspy.Assert(
            len(items.item_descriptions) > 0,
            "No line items found in invoice"
        )

        # Step 2: Extract details per item
        line_items = []
        for desc in items.item_descriptions:
            result = self.extract_item(
                invoice_text=invoice_text,
                item_description=desc,
            )
            line_items.append(result.details)

        return dspy.Prediction(line_items=line_items)
The identify step works as an "attention anchor" — once the model has listed all items, the extraction step knows exactly what to look for and is much less likely to skip anything.
先统计或命名所有条目,再逐个处理。这可以避免“内容遗漏”失效(比如模型仅提取7个条目中的3个)。
适用场景: 模型持续遗漏条目的可变长度列表。
python
class IdentifyLineItems(dspy.Signature):
    """Identify all line items in the invoice. List every item, even small ones."""
    invoice_text: str = dspy.InputField(desc="Raw invoice text")
    item_descriptions: list[str] = dspy.OutputField(
        desc="Brief description of each line item, in order they appear"
    )

class LineItemDetail(BaseModel):
    description: str
    quantity: int
    unit_price: float
    total: float

class ExtractLineItem(dspy.Signature):
    """Extract the details for a specific line item from the invoice."""
    invoice_text: str = dspy.InputField(desc="Raw invoice text")
    item_description: str = dspy.InputField(desc="The specific item to extract details for")
    details: LineItemDetail = dspy.OutputField()

class IdentifyThenExtract(dspy.Module):
    def __init__(self):
        self.identify = dspy.ChainOfThought(IdentifyLineItems)
        self.extract_item = dspy.ChainOfThought(ExtractLineItem)

    def forward(self, invoice_text: str):
        # Step 1: Identify all items (just names — low cognitive load)
        items = self.identify(invoice_text=invoice_text)

        dspy.Assert(
            len(items.item_descriptions) > 0,
            "No line items found in invoice"
        )

        # Step 2: Extract details per item
        line_items = []
        for desc in items.item_descriptions:
            result = self.extract_item(
                invoice_text=invoice_text,
                item_description=desc,
            )
            line_items.append(result.details)

        return dspy.Prediction(line_items=line_items)
识别步骤充当“注意力锚点”——一旦模型列出所有条目,提取步骤就明确知道要寻找的内容,遗漏的可能性大大降低。

Step 6: Compare single-step vs decomposed

步骤6:比较单步骤与拆解后方案

Always measure the improvement. The decomposed version costs more (multiple LM calls), so you need to verify the accuracy gain justifies the cost:
python
from dspy.evaluate import Evaluate
务必衡量改进效果。拆解后的方案成本更高(多次调用大语言模型),因此需要验证准确率提升是否值得额外成本:
python
from dspy.evaluate import Evaluate

Build both versions

Build both versions

single_step = dspy.ChainOfThought(ExtractAllItems) # Original single-step decomposed = IdentifyThenExtract() # Decomposed version
def extraction_metric(example, prediction, trace=None): """Measure recall — what fraction of gold items were extracted.""" gold_items = set(item.lower() for item in example.item_names) pred_items = set(item.description.lower() for item in prediction.line_items) if not gold_items: return 1.0 return len(gold_items & pred_items) / len(gold_items)
evaluator = Evaluate(devset=devset, metric=extraction_metric, num_threads=4, display_table=5)
single_step = dspy.ChainOfThought(ExtractAllItems) # Original single-step decomposed = IdentifyThenExtract() # Decomposed version
def extraction_metric(example, prediction, trace=None): """Measure recall — what fraction of gold items were extracted.""" gold_items = set(item.lower() for item in example.item_names) pred_items = set(item.description.lower() for item in prediction.line_items) if not gold_items: return 1.0 return len(gold_items & pred_items) / len(gold_items)
evaluator = Evaluate(devset=devset, metric=extraction_metric, num_threads=4, display_table=5)

Compare

Compare

single_score = evaluator(single_step) decomposed_score = evaluator(decomposed)
print(f"Single-step: {single_score:.1f}%") print(f"Decomposed: {decomposed_score:.1f}%")
undefined
single_score = evaluator(single_step) decomposed_score = evaluator(decomposed)
print(f"Single-step: {single_score:.1f}%") print(f"Decomposed: {decomposed_score:.1f}%")
undefined

Stratify by complexity

按复杂度分层衡量

The real value of decomposition shows on complex inputs. Measure separately:
python
simple_devset = [ex for ex in devset if len(ex.item_names) <= 3]
complex_devset = [ex for ex in devset if len(ex.item_names) > 3]

simple_evaluator = Evaluate(devset=simple_devset, metric=extraction_metric)
complex_evaluator = Evaluate(devset=complex_devset, metric=extraction_metric)

print("Simple inputs:")
print(f"  Single-step: {simple_evaluator(single_step):.1f}%")
print(f"  Decomposed:  {simple_evaluator(decomposed):.1f}%")

print("Complex inputs:")
print(f"  Single-step: {complex_evaluator(single_step):.1f}%")
print(f"  Decomposed:  {complex_evaluator(decomposed):.1f}%")
If the decomposed version doesn't significantly outperform on complex inputs, you may not need the decomposition. Stick with the simpler single-step approach.
拆解方案的真正价值体现在复杂输入上。请分别衡量:
python
simple_devset = [ex for ex in devset if len(ex.item_names) <= 3]
complex_devset = [ex for ex in devset if len(ex.item_names) > 3]

simple_evaluator = Evaluate(devset=simple_devset, metric=extraction_metric)
complex_evaluator = Evaluate(devset=complex_devset, metric=extraction_metric)

print("Simple inputs:")
print(f"  Single-step: {simple_evaluator(single_step):.1f}%")
print(f"  Decomposed:  {simple_evaluator(decomposed):.1f}%")

print("Complex inputs:")
print(f"  Single-step: {complex_evaluator(single_step):.1f}%")
print(f"  Decomposed:  {complex_evaluator(decomposed):.1f}%")
如果拆解后的方案在复杂输入上没有显著优于单步骤方案,那么你可能不需要拆解。继续使用更简单的单步骤方案即可。

Step 7: Optimize end-to-end

步骤7:端到端优化

MIPROv2 can optimize all stages of your decomposed pipeline together. This is powerful because the identify step learns to produce outputs that help the extract step:
python
optimizer = dspy.MIPROv2(metric=extraction_metric, auto="medium")
optimized = optimizer.compile(decomposed, trainset=trainset)
MIPROv2可以一起优化拆解后流水线的所有阶段。这非常强大,因为识别步骤会学习生成有助于提取步骤的输出:
python
optimizer = dspy.MIPROv2(metric=extraction_metric, auto="medium")
optimized = optimizer.compile(decomposed, trainset=trainset)

Verify improvement

Verify improvement

optimized_score = evaluator(optimized) print(f"Decomposed (unoptimized): {decomposed_score:.1f}%") print(f"Decomposed (optimized): {optimized_score:.1f}%")
undefined
optimized_score = evaluator(optimized) print(f"Decomposed (unoptimized): {decomposed_score:.1f}%") print(f"Decomposed (optimized): {optimized_score:.1f}%")
undefined

Use different models per stage

为不同阶段使用不同模型

The identify step (listing items) is simpler than the extract step (pulling details). Use a cheaper model for the easy step:
python
cheap_lm = dspy.LM("openai/gpt-4o-mini")
quality_lm = dspy.LM("openai/gpt-4o")

decomposed.identify.set_lm(cheap_lm)      # Cheap for listing
decomposed.extract_item.set_lm(quality_lm) # Quality for extraction
See
/ai-cutting-costs
for more cost strategies.
识别步骤(列出条目)比提取步骤(提取细节)更简单。对简单步骤使用更便宜的模型:
python
cheap_lm = dspy.LM("openai/gpt-4o-mini")
quality_lm = dspy.LM("openai/gpt-4o")

decomposed.identify.set_lm(cheap_lm)      # Cheap for listing
decomposed.extract_item.set_lm(quality_lm) # Quality for extraction
更多成本策略请参见
/ai-cutting-costs

Key patterns

核心模式

  • Decompose when complexity causes failures — if it works on simple inputs but fails on complex ones, restructure
  • Identify-then-process prevents missed items — listing first creates an attention anchor
  • Sequential extraction prevents conflation — extract one type of thing at a time
  • Chunking handles long documents — overlap chunks and deduplicate results
  • Always compare against single-step — decomposition costs more, so verify the accuracy gain
  • Stratify by complexity — the payoff shows on complex inputs, not simple ones
  • Optimize end-to-end — MIPROv2 tunes all stages together for best results
  • Cheap models for easy stages — the identify step rarely needs an expensive model
  • 当复杂度导致失效时进行拆解——如果在简单输入上正常但在复杂输入上失效,就重构任务
  • 识别后处理防止内容遗漏——先列出内容创建注意力锚点
  • 顺序提取防止混淆——一次仅提取一种类型的内容
  • 分块处理长文档——重叠块并对结果去重
  • 始终与单步骤方案比较——拆解成本更高,因此要验证准确率提升的价值
  • 按复杂度分层衡量——收益体现在复杂输入上,而非简单输入
  • 端到端优化——MIPROv2会协同调优所有阶段以获得最佳结果
  • 简单阶段使用便宜模型——识别步骤几乎不需要昂贵模型

Additional resources

额外资源

  • For worked examples (medical reports, invoices, resumes), see examples.md
  • Already know your pipeline stages? Use
    /ai-building-pipelines
    to wire them together
  • Need to improve accuracy within a single step? Use
    /ai-improving-accuracy
  • Need to extract structured data? Start with
    /ai-parsing-data
    — decompose only if it struggles on complex inputs
  • Next:
    /ai-improving-accuracy
    to measure and optimize your decomposed pipeline
  • 关于实战示例(医疗报告、发票、简历),请参见examples.md
  • 已经明确流水线阶段?使用
    /ai-building-pipelines
    将它们连接起来
  • 需要在单步骤内提升准确率?使用
    /ai-improving-accuracy
  • 需要提取结构化数据?从
    /ai-parsing-data
    开始——仅当在复杂输入上遇到问题时再进行拆解
  • 下一步:使用
    /ai-improving-accuracy
    衡量和优化你的拆解后流水线