query-decomposition
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseQuery Decomposition
查询分解
Break complex queries into independent concepts for parallel retrieval and fusion.
将复杂查询拆分为独立概念,以进行并行检索和结果融合。
Overview
概述
- Complex queries spanning multiple topics or concepts
- Multi-hop questions requiring chained reasoning
- Queries where single retrieval misses relevant documents
- Improving recall for compound questions
Break complex queries into independent concepts for parallel retrieval and fusion.
- 涵盖多个主题或概念的复杂查询
- 需要链式推理的多跳问题
- 单次检索会遗漏相关文档的查询
- 提升复合问题的召回率
将复杂查询拆分为独立概念,以进行并行检索和结果融合。
The Problem
问题背景
Complex queries span multiple topics that may not co-occur in single documents:
Query: "How do chunking strategies affect reranking in RAG?"
→ Single search may miss docs about chunking OR reranking
→ Poor coverage across all concepts复杂查询涵盖多个主题,这些主题可能不会同时出现在单个文档中:
Query: "How do chunking strategies affect reranking in RAG?"
→ Single search may miss docs about chunking OR reranking
→ Poor coverage across all conceptsThe Solution
解决方案
Decompose into independent concepts, retrieve separately, then fuse:
Query: "How do chunking strategies affect reranking in RAG?"
→ Concepts: ["chunking strategies", "reranking methods", "RAG pipeline"]
→ Search each concept independently
→ Fuse results with Reciprocal Rank Fusion (RRF)
→ Full coverage across all topics将查询分解为独立概念,分别检索后再融合结果:
Query: "How do chunking strategies affect reranking in RAG?"
→ Concepts: ["chunking strategies", "reranking methods", "RAG pipeline"]
→ Search each concept independently
→ Fuse results with Reciprocal Rank Fusion (RRF)
→ Full coverage across all topicsImplementation
实现方案
1. Heuristic Detection (Fast Path)
1. 启发式检测(快速路径)
python
MULTI_CONCEPT_INDICATORS = [
" vs ", " versus ", " compared to ", " or ",
" and ", " with ", " affect ", " impact ",
"difference between", "relationship between",
]
def is_multi_concept_heuristic(query: str) -> bool:
"""Fast check for multi-concept indicators (<1ms)."""
query_lower = query.lower()
return any(ind in query_lower for ind in MULTI_CONCEPT_INDICATORS)python
MULTI_CONCEPT_INDICATORS = [
" vs ", " versus ", " compared to ", " or ",
" and ", " with ", " affect ", " impact ",
"difference between", "relationship between",
]
def is_multi_concept_heuristic(query: str) -> bool:
"""Fast check for multi-concept indicators (<1ms)."""
query_lower = query.lower()
return any(ind in query_lower for ind in MULTI_CONCEPT_INDICATORS)2. LLM Decomposition
2. LLM分解法
python
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
class ConceptExtraction(BaseModel):
"""LLM output schema for concept extraction."""
concepts: list[str] = Field(
...,
min_length=1,
max_length=5,
description="Distinct concepts from the query",
)
reasoning: str | None = None
async def decompose_query(
query: str,
llm: AsyncOpenAI,
) -> list[str]:
"""Extract independent concepts using LLM."""
response = await llm.chat.completions.create(
model="gpt-5.2-mini",
messages=[
{"role": "system", "content": """
Extract 2-4 independent concepts from this query.
Each concept should be searchable on its own.
Output JSON: {"concepts": ["concept1", "concept2"], "reasoning": "..."}
"""},
{"role": "user", "content": query}
],
response_format={"type": "json_object"},
temperature=0,
)
result = ConceptExtraction.model_validate_json(
response.choices[0].message.content
)
return result.conceptspython
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
class ConceptExtraction(BaseModel):
"""LLM output schema for concept extraction."""
concepts: list[str] = Field(
...,
min_length=1,
max_length=5,
description="Distinct concepts from the query",
)
reasoning: str | None = None
async def decompose_query(
query: str,
llm: AsyncOpenAI,
) -> list[str]:
"""Extract independent concepts using LLM."""
response = await llm.chat.completions.create(
model="gpt-5.2-mini",
messages=[
{"role": "system", "content": """
Extract 2-4 independent concepts from this query.
Each concept should be searchable on its own.
Output JSON: {"concepts": ["concept1", "concept2"], "reasoning": "..."}
"""},
{"role": "user", "content": query}
],
response_format={"type": "json_object"},
temperature=0,
)
result = ConceptExtraction.model_validate_json(
response.choices[0].message.content
)
return result.concepts3. Parallel Retrieval + RRF Fusion
3. 并行检索 + RRF融合
python
import asyncio
from collections import defaultdict
async def decomposed_search(
query: str,
search_fn: callable,
llm: AsyncOpenAI,
top_k: int = 10,
) -> list[dict]:
"""Search with query decomposition and RRF fusion."""
# Check if decomposition needed
if not is_multi_concept_heuristic(query):
return await search_fn(query, limit=top_k)
# Decompose into concepts
concepts = await decompose_query(query, llm)
if len(concepts) <= 1:
return await search_fn(query, limit=top_k)
# Parallel retrieval for each concept
tasks = [search_fn(concept, limit=top_k) for concept in concepts]
results_per_concept = await asyncio.gather(*tasks)
# RRF fusion
return reciprocal_rank_fusion(results_per_concept, k=60)
def reciprocal_rank_fusion(
result_lists: list[list[dict]],
k: int = 60,
) -> list[dict]:
"""Combine ranked lists using RRF."""
scores: defaultdict[str, float] = defaultdict(float)
docs: dict[str, dict] = {}
for results in result_lists:
for rank, doc in enumerate(results, start=1):
doc_id = doc["id"]
scores[doc_id] += 1.0 / (k + rank)
docs[doc_id] = doc
# Sort by RRF score
ranked_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [docs[doc_id] for doc_id in ranked_ids]python
import asyncio
from collections import defaultdict
async def decomposed_search(
query: str,
search_fn: callable,
llm: AsyncOpenAI,
top_k: int = 10,
) -> list[dict]:
"""Search with query decomposition and RRF fusion."""
# Check if decomposition needed
if not is_multi_concept_heuristic(query):
return await search_fn(query, limit=top_k)
# Decompose into concepts
concepts = await decompose_query(query, llm)
if len(concepts) <= 1:
return await search_fn(query, limit=top_k)
# Parallel retrieval for each concept
tasks = [search_fn(concept, limit=top_k) for concept in concepts]
results_per_concept = await asyncio.gather(*tasks)
# RRF fusion
return reciprocal_rank_fusion(results_per_concept, k=60)
def reciprocal_rank_fusion(
result_lists: list[list[dict]],
k: int = 60,
) -> list[dict]:
"""Combine ranked lists using RRF."""
scores: defaultdict[str, float] = defaultdict(float)
docs: dict[str, dict] = {}
for results in result_lists:
for rank, doc in enumerate(results, start=1):
doc_id = doc["id"]
scores[doc_id] += 1.0 / (k + rank)
docs[doc_id] = doc
# Sort by RRF score
ranked_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [docs[doc_id] for doc_id in ranked_ids]Complete Service
完整服务实现
python
class QueryDecomposer:
def __init__(self, llm, search_fn):
self.llm = llm
self.search_fn = search_fn
self._cache: dict[str, list[str]] = {}
async def search(
self,
query: str,
top_k: int = 10,
) -> list[dict]:
"""Search with automatic decomposition."""
# Fast path: single concept
if not is_multi_concept_heuristic(query):
return await self.search_fn(query, limit=top_k)
# Check cache
cache_key = query.lower().strip()
if cache_key in self._cache:
concepts = self._cache[cache_key]
else:
concepts = await decompose_query(query, self.llm)
self._cache[cache_key] = concepts
# Single concept after decomposition
if len(concepts) <= 1:
return await self.search_fn(query, limit=top_k)
# Parallel retrieval
tasks = [self.search_fn(c, limit=top_k) for c in concepts]
results_per_concept = await asyncio.gather(*tasks)
# Fuse with RRF
return reciprocal_rank_fusion(results_per_concept)[:top_k]python
class QueryDecomposer:
def __init__(self, llm, search_fn):
self.llm = llm
self.search_fn = search_fn
self._cache: dict[str, list[str]] = {}
async def search(
self,
query: str,
top_k: int = 10,
) -> list[dict]:
"""Search with automatic decomposition."""
# Fast path: single concept
if not is_multi_concept_heuristic(query):
return await self.search_fn(query, limit=top_k)
# Check cache
cache_key = query.lower().strip()
if cache_key in self._cache:
concepts = self._cache[cache_key]
else:
concepts = await decompose_query(query, self.llm)
self._cache[cache_key] = concepts
# Single concept after decomposition
if len(concepts) <= 1:
return await self.search_fn(query, limit=top_k)
# Parallel retrieval
tasks = [self.search_fn(c, limit=top_k) for c in concepts]
results_per_concept = await asyncio.gather(*tasks)
# Fuse with RRF
return reciprocal_rank_fusion(results_per_concept)[:top_k]Combining with HyDE
与HyDE结合使用
python
async def decomposed_hyde_search(
query: str,
decomposer: QueryDecomposer,
hyde_service: HyDEService,
vector_search: callable,
top_k: int = 10,
) -> list[dict]:
"""Best of both: decomposition + HyDE for each concept."""
# Decompose query
concepts = await decomposer.get_concepts(query)
# Generate HyDE for each concept in parallel
hyde_results = await asyncio.gather(*[
hyde_service.generate(concept) for concept in concepts
])
# Search with HyDE embeddings
search_tasks = [
vector_search(embedding=hr.embedding, limit=top_k)
for hr in hyde_results
]
results_per_concept = await asyncio.gather(*search_tasks)
# Fuse results
return reciprocal_rank_fusion(results_per_concept)[:top_k]python
async def decomposed_hyde_search(
query: str,
decomposer: QueryDecomposer,
hyde_service: HyDEService,
vector_search: callable,
top_k: int = 10,
) -> list[dict]:
"""Best of both: decomposition + HyDE for each concept."""
# Decompose query
concepts = await decomposer.get_concepts(query)
# Generate HyDE for each concept in parallel
hyde_results = await asyncio.gather(*[
hyde_service.generate(concept) for concept in concepts
])
# Search with HyDE embeddings
search_tasks = [
vector_search(embedding=hr.embedding, limit=top_k)
for hr in hyde_results
]
results_per_concept = await asyncio.gather(*search_tasks)
# Fuse results
return reciprocal_rank_fusion(results_per_concept)[:top_k]When to Decompose
何时需要进行查询分解
| Query Type | Decompose? |
|---|---|
| "What is X?" | No |
| "X vs Y" | Yes |
| "How does X affect Y?" | Yes |
| "Best practices for X" | No |
| "X and Y in Z" | Yes |
| "Difference between X, Y, Z" | Yes |
| 查询类型 | 是否需要分解 |
|---|---|
| "What is X?" | 否 |
| "X vs Y" | 是 |
| "How does X affect Y?" | 是 |
| "Best practices for X" | 否 |
| "X and Y in Z" | 是 |
| "Difference between X, Y, Z" | 是 |
Performance Tips
性能优化建议
- Use heuristics first (sub-millisecond)
- Cache decomposition results
- Limit to 2-4 concepts max
- Set timeout with fallback to original query
- Combine with HyDE for vocabulary bridging
- 优先使用启发式检测(耗时低于1毫秒)
- 缓存查询分解结果
- 最多限制为2-4个概念
- 设置超时机制,超时后回退到原始查询检索
- 与HyDE结合使用,实现词汇桥接
Related Skills
相关技术
- - Core RAG patterns enhanced by query decomposition
rag-retrieval - - Combine with HyDE for vocabulary bridging per concept
hyde-retrieval - - Rerank fused results for final precision
reranking-patterns - - Embedding strategies for parallel concept retrieval
embeddings
- - 查询分解可增强核心RAG模式
rag-retrieval - - 与HyDE结合,为每个概念实现词汇桥接
hyde-retrieval - - 对融合后的结果进行重排序,提升最终精度
reranking-patterns - - 用于并行概念检索的嵌入策略
embeddings
Key Decisions
关键决策
| Decision | Choice | Rationale |
|---|---|---|
| Decomposition detection | Heuristic first, LLM second | Sub-millisecond fast path for simple queries |
| Max concepts | 2-4 | More concepts increase latency without proportional benefit |
| Fusion algorithm | Reciprocal Rank Fusion (RRF) | Robust, parameter-free rank combination |
| LLM for decomposition | gpt-5.2-mini | Fast, cheap, good at concept extraction |
| 决策项 | 选择方案 | 理由 |
|---|---|---|
| 分解检测方式 | 优先启发式,再LLM | 为简单查询提供毫秒级的快速路径 |
| 最大概念数量 | 2-4个 | 更多概念会增加延迟,但无法获得成比例的收益 |
| 结果融合算法 | Reciprocal Rank Fusion (RRF) | 鲁棒性强、无需参数的排序结果组合算法 |
| 分解用LLM模型 | gpt-5.2-mini | 速度快、成本低,概念提取效果好 |