query-decomposition

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Query Decomposition

查询分解

Break complex queries into independent concepts for parallel retrieval and fusion.
将复杂查询拆分为独立概念,以进行并行检索和结果融合。

Overview

概述

  • Complex queries spanning multiple topics or concepts
  • Multi-hop questions requiring chained reasoning
  • Queries where single retrieval misses relevant documents
  • Improving recall for compound questions
Break complex queries into independent concepts for parallel retrieval and fusion.
  • 涵盖多个主题或概念的复杂查询
  • 需要链式推理的多跳问题
  • 单次检索会遗漏相关文档的查询
  • 提升复合问题的召回率
将复杂查询拆分为独立概念,以进行并行检索和结果融合。

The Problem

问题背景

Complex queries span multiple topics that may not co-occur in single documents:
Query: "How do chunking strategies affect reranking in RAG?"
→ Single search may miss docs about chunking OR reranking
→ Poor coverage across all concepts
复杂查询涵盖多个主题,这些主题可能不会同时出现在单个文档中:
Query: "How do chunking strategies affect reranking in RAG?"
→ Single search may miss docs about chunking OR reranking
→ Poor coverage across all concepts

The Solution

解决方案

Decompose into independent concepts, retrieve separately, then fuse:
Query: "How do chunking strategies affect reranking in RAG?"
→ Concepts: ["chunking strategies", "reranking methods", "RAG pipeline"]
→ Search each concept independently
→ Fuse results with Reciprocal Rank Fusion (RRF)
→ Full coverage across all topics
将查询分解为独立概念,分别检索后再融合结果:
Query: "How do chunking strategies affect reranking in RAG?"
→ Concepts: ["chunking strategies", "reranking methods", "RAG pipeline"]
→ Search each concept independently
→ Fuse results with Reciprocal Rank Fusion (RRF)
→ Full coverage across all topics

Implementation

实现方案

1. Heuristic Detection (Fast Path)

1. 启发式检测(快速路径)

python
MULTI_CONCEPT_INDICATORS = [
    " vs ", " versus ", " compared to ", " or ",
    " and ", " with ", " affect ", " impact ",
    "difference between", "relationship between",
]

def is_multi_concept_heuristic(query: str) -> bool:
    """Fast check for multi-concept indicators (<1ms)."""
    query_lower = query.lower()
    return any(ind in query_lower for ind in MULTI_CONCEPT_INDICATORS)
python
MULTI_CONCEPT_INDICATORS = [
    " vs ", " versus ", " compared to ", " or ",
    " and ", " with ", " affect ", " impact ",
    "difference between", "relationship between",
]

def is_multi_concept_heuristic(query: str) -> bool:
    """Fast check for multi-concept indicators (<1ms)."""
    query_lower = query.lower()
    return any(ind in query_lower for ind in MULTI_CONCEPT_INDICATORS)

2. LLM Decomposition

2. LLM分解法

python
from pydantic import BaseModel, Field
from openai import AsyncOpenAI

class ConceptExtraction(BaseModel):
    """LLM output schema for concept extraction."""
    concepts: list[str] = Field(
        ...,
        min_length=1,
        max_length=5,
        description="Distinct concepts from the query",
    )
    reasoning: str | None = None

async def decompose_query(
    query: str,
    llm: AsyncOpenAI,
) -> list[str]:
    """Extract independent concepts using LLM."""

    response = await llm.chat.completions.create(
        model="gpt-5.2-mini",
        messages=[
            {"role": "system", "content": """
Extract 2-4 independent concepts from this query.
Each concept should be searchable on its own.
Output JSON: {"concepts": ["concept1", "concept2"], "reasoning": "..."}
"""},
            {"role": "user", "content": query}
        ],
        response_format={"type": "json_object"},
        temperature=0,
    )

    result = ConceptExtraction.model_validate_json(
        response.choices[0].message.content
    )
    return result.concepts
python
from pydantic import BaseModel, Field
from openai import AsyncOpenAI

class ConceptExtraction(BaseModel):
    """LLM output schema for concept extraction."""
    concepts: list[str] = Field(
        ...,
        min_length=1,
        max_length=5,
        description="Distinct concepts from the query",
    )
    reasoning: str | None = None

async def decompose_query(
    query: str,
    llm: AsyncOpenAI,
) -> list[str]:
    """Extract independent concepts using LLM."""

    response = await llm.chat.completions.create(
        model="gpt-5.2-mini",
        messages=[
            {"role": "system", "content": """
Extract 2-4 independent concepts from this query.
Each concept should be searchable on its own.
Output JSON: {"concepts": ["concept1", "concept2"], "reasoning": "..."}
"""},
            {"role": "user", "content": query}
        ],
        response_format={"type": "json_object"},
        temperature=0,
    )

    result = ConceptExtraction.model_validate_json(
        response.choices[0].message.content
    )
    return result.concepts

3. Parallel Retrieval + RRF Fusion

3. 并行检索 + RRF融合

python
import asyncio
from collections import defaultdict

async def decomposed_search(
    query: str,
    search_fn: callable,
    llm: AsyncOpenAI,
    top_k: int = 10,
) -> list[dict]:
    """Search with query decomposition and RRF fusion."""

    # Check if decomposition needed
    if not is_multi_concept_heuristic(query):
        return await search_fn(query, limit=top_k)

    # Decompose into concepts
    concepts = await decompose_query(query, llm)

    if len(concepts) <= 1:
        return await search_fn(query, limit=top_k)

    # Parallel retrieval for each concept
    tasks = [search_fn(concept, limit=top_k) for concept in concepts]
    results_per_concept = await asyncio.gather(*tasks)

    # RRF fusion
    return reciprocal_rank_fusion(results_per_concept, k=60)


def reciprocal_rank_fusion(
    result_lists: list[list[dict]],
    k: int = 60,
) -> list[dict]:
    """Combine ranked lists using RRF."""
    scores: defaultdict[str, float] = defaultdict(float)
    docs: dict[str, dict] = {}

    for results in result_lists:
        for rank, doc in enumerate(results, start=1):
            doc_id = doc["id"]
            scores[doc_id] += 1.0 / (k + rank)
            docs[doc_id] = doc

    # Sort by RRF score
    ranked_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
    return [docs[doc_id] for doc_id in ranked_ids]
python
import asyncio
from collections import defaultdict

async def decomposed_search(
    query: str,
    search_fn: callable,
    llm: AsyncOpenAI,
    top_k: int = 10,
) -> list[dict]:
    """Search with query decomposition and RRF fusion."""

    # Check if decomposition needed
    if not is_multi_concept_heuristic(query):
        return await search_fn(query, limit=top_k)

    # Decompose into concepts
    concepts = await decompose_query(query, llm)

    if len(concepts) <= 1:
        return await search_fn(query, limit=top_k)

    # Parallel retrieval for each concept
    tasks = [search_fn(concept, limit=top_k) for concept in concepts]
    results_per_concept = await asyncio.gather(*tasks)

    # RRF fusion
    return reciprocal_rank_fusion(results_per_concept, k=60)


def reciprocal_rank_fusion(
    result_lists: list[list[dict]],
    k: int = 60,
) -> list[dict]:
    """Combine ranked lists using RRF."""
    scores: defaultdict[str, float] = defaultdict(float)
    docs: dict[str, dict] = {}

    for results in result_lists:
        for rank, doc in enumerate(results, start=1):
            doc_id = doc["id"]
            scores[doc_id] += 1.0 / (k + rank)
            docs[doc_id] = doc

    # Sort by RRF score
    ranked_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
    return [docs[doc_id] for doc_id in ranked_ids]

Complete Service

完整服务实现

python
class QueryDecomposer:
    def __init__(self, llm, search_fn):
        self.llm = llm
        self.search_fn = search_fn
        self._cache: dict[str, list[str]] = {}

    async def search(
        self,
        query: str,
        top_k: int = 10,
    ) -> list[dict]:
        """Search with automatic decomposition."""

        # Fast path: single concept
        if not is_multi_concept_heuristic(query):
            return await self.search_fn(query, limit=top_k)

        # Check cache
        cache_key = query.lower().strip()
        if cache_key in self._cache:
            concepts = self._cache[cache_key]
        else:
            concepts = await decompose_query(query, self.llm)
            self._cache[cache_key] = concepts

        # Single concept after decomposition
        if len(concepts) <= 1:
            return await self.search_fn(query, limit=top_k)

        # Parallel retrieval
        tasks = [self.search_fn(c, limit=top_k) for c in concepts]
        results_per_concept = await asyncio.gather(*tasks)

        # Fuse with RRF
        return reciprocal_rank_fusion(results_per_concept)[:top_k]
python
class QueryDecomposer:
    def __init__(self, llm, search_fn):
        self.llm = llm
        self.search_fn = search_fn
        self._cache: dict[str, list[str]] = {}

    async def search(
        self,
        query: str,
        top_k: int = 10,
    ) -> list[dict]:
        """Search with automatic decomposition."""

        # Fast path: single concept
        if not is_multi_concept_heuristic(query):
            return await self.search_fn(query, limit=top_k)

        # Check cache
        cache_key = query.lower().strip()
        if cache_key in self._cache:
            concepts = self._cache[cache_key]
        else:
            concepts = await decompose_query(query, self.llm)
            self._cache[cache_key] = concepts

        # Single concept after decomposition
        if len(concepts) <= 1:
            return await self.search_fn(query, limit=top_k)

        # Parallel retrieval
        tasks = [self.search_fn(c, limit=top_k) for c in concepts]
        results_per_concept = await asyncio.gather(*tasks)

        # Fuse with RRF
        return reciprocal_rank_fusion(results_per_concept)[:top_k]

Combining with HyDE

与HyDE结合使用

python
async def decomposed_hyde_search(
    query: str,
    decomposer: QueryDecomposer,
    hyde_service: HyDEService,
    vector_search: callable,
    top_k: int = 10,
) -> list[dict]:
    """Best of both: decomposition + HyDE for each concept."""

    # Decompose query
    concepts = await decomposer.get_concepts(query)

    # Generate HyDE for each concept in parallel
    hyde_results = await asyncio.gather(*[
        hyde_service.generate(concept) for concept in concepts
    ])

    # Search with HyDE embeddings
    search_tasks = [
        vector_search(embedding=hr.embedding, limit=top_k)
        for hr in hyde_results
    ]
    results_per_concept = await asyncio.gather(*search_tasks)

    # Fuse results
    return reciprocal_rank_fusion(results_per_concept)[:top_k]
python
async def decomposed_hyde_search(
    query: str,
    decomposer: QueryDecomposer,
    hyde_service: HyDEService,
    vector_search: callable,
    top_k: int = 10,
) -> list[dict]:
    """Best of both: decomposition + HyDE for each concept."""

    # Decompose query
    concepts = await decomposer.get_concepts(query)

    # Generate HyDE for each concept in parallel
    hyde_results = await asyncio.gather(*[
        hyde_service.generate(concept) for concept in concepts
    ])

    # Search with HyDE embeddings
    search_tasks = [
        vector_search(embedding=hr.embedding, limit=top_k)
        for hr in hyde_results
    ]
    results_per_concept = await asyncio.gather(*search_tasks)

    # Fuse results
    return reciprocal_rank_fusion(results_per_concept)[:top_k]

When to Decompose

何时需要进行查询分解

Query TypeDecompose?
"What is X?"No
"X vs Y"Yes
"How does X affect Y?"Yes
"Best practices for X"No
"X and Y in Z"Yes
"Difference between X, Y, Z"Yes
查询类型是否需要分解
"What is X?"
"X vs Y"
"How does X affect Y?"
"Best practices for X"
"X and Y in Z"
"Difference between X, Y, Z"

Performance Tips

性能优化建议

  • Use heuristics first (sub-millisecond)
  • Cache decomposition results
  • Limit to 2-4 concepts max
  • Set timeout with fallback to original query
  • Combine with HyDE for vocabulary bridging
  • 优先使用启发式检测(耗时低于1毫秒)
  • 缓存查询分解结果
  • 最多限制为2-4个概念
  • 设置超时机制,超时后回退到原始查询检索
  • 与HyDE结合使用,实现词汇桥接

Related Skills

相关技术

  • rag-retrieval
    - Core RAG patterns enhanced by query decomposition
  • hyde-retrieval
    - Combine with HyDE for vocabulary bridging per concept
  • reranking-patterns
    - Rerank fused results for final precision
  • embeddings
    - Embedding strategies for parallel concept retrieval
  • rag-retrieval
    - 查询分解可增强核心RAG模式
  • hyde-retrieval
    - 与HyDE结合,为每个概念实现词汇桥接
  • reranking-patterns
    - 对融合后的结果进行重排序,提升最终精度
  • embeddings
    - 用于并行概念检索的嵌入策略

Key Decisions

关键决策

DecisionChoiceRationale
Decomposition detectionHeuristic first, LLM secondSub-millisecond fast path for simple queries
Max concepts2-4More concepts increase latency without proportional benefit
Fusion algorithmReciprocal Rank Fusion (RRF)Robust, parameter-free rank combination
LLM for decompositiongpt-5.2-miniFast, cheap, good at concept extraction
决策项选择方案理由
分解检测方式优先启发式,再LLM为简单查询提供毫秒级的快速路径
最大概念数量2-4个更多概念会增加延迟,但无法获得成比例的收益
结果融合算法Reciprocal Rank Fusion (RRF)鲁棒性强、无需参数的排序结果组合算法
分解用LLM模型gpt-5.2-mini速度快、成本低,概念提取效果好

References

参考资料