knowledge-ingestion-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseKnowledge Ingestion Patterns Skill
知识摄入模式技能
Systematic approaches for ingesting different content types into RAG with optimal chunking, metadata, and retrieval quality.
将不同类型内容摄入RAG的系统化方法,可实现最优分块、元数据管理与检索质量。
Overview
概述
Different content types require different ingestion strategies. This skill documents best practices for:
- Websites and web content
- PDF documents
- Code repositories
- Conversation exports
- Research notes
- API documentation
不同内容类型需要采用不同的摄入策略。本技能记录了以下场景的最佳实践:
- 网站与网页内容
- PDF文档
- 代码仓库
- 对话导出文件
- 研究笔记
- API 文档
Core Principles
核心原则
- Chunk for retrieval - Optimize chunk size for the questions you'll ask
- Metadata matters - Rich metadata enables filtered search
- Preserve context - Don't lose meaning when splitting
- Deduplicate - Avoid ingesting the same content twice
- 面向检索分块 - 针对你要查询的问题优化分块大小
- 元数据至关重要 - 丰富的元数据可支撑过滤搜索
- 保留上下文 - 拆分内容时不要丢失语义
- 去重 - 避免重复摄入相同内容
Content Type Patterns
内容类型对应模式
Pattern 2: PDF Documents
模式2:PDF文档
When to use: Research papers, reports, ebooks, scanned documents
Chunking Strategy: Page-aware with overlap, handle tables/figures specially
python
import fitz # PyMuPDF
from typing import List, Dict
def chunk_pdf(pdf_path: str, chunk_size: int = 500) -> List[Dict]:
"""Extract and chunk PDF content with page awareness."""
doc = fitz.open(pdf_path)
chunks = []
for page_num, page in enumerate(doc, 1):
text = page.get_text()
# Skip empty pages
if not text.strip():
continue
# Split into paragraphs
paragraphs = text.split('
')
current_chunk = ""
for para in paragraphs:
para = para.strip()
if not para:
continue
if len(current_chunk) + len(para) < chunk_size:
current_chunk += " " + para
else:
if current_chunk:
chunks.append({
"content": current_chunk.strip(),
"metadata": {
"type": "pdf",
"source": pdf_path,
"page": page_num,
"total_pages": len(doc)
}
})
current_chunk = para
# Don't forget last chunk of page
if current_chunk:
chunks.append({
"content": current_chunk.strip(),
"metadata": {
"type": "pdf",
"source": pdf_path,
"page": page_num,
"total_pages": len(doc)
}
})
return chunks
def extract_pdf_tables(pdf_path: str) -> List[Dict]:
"""Extract tables from PDF as separate chunks."""
import pdfplumber
tables = []
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, 1):
for table_num, table in enumerate(page.extract_tables(), 1):
# Convert table to markdown format
if table:
headers = table[0]
rows = table[1:]
md_table = "| " + " | ".join(str(h) for h in headers) + " |
"
md_table += "| " + " | ".join("---" for _ in headers) + " |
"
for row in rows:
md_table += "| " + " | ".join(str(c) for c in row) + " |
"
tables.append({
"content": md_table,
"metadata": {
"type": "pdf_table",
"source": pdf_path,
"page": page_num,
"table_number": table_num
}
})
return tablesMetadata Schema:
yaml
type: pdf | pdf_table
source: file path
page: page number
total_pages: document length
table_number: (for tables) which table on page适用场景:研究论文、报告、电子书、扫描文档
分块策略:带重叠的页码感知分块,对表格/图片做特殊处理
python
import fitz # PyMuPDF
from typing import List, Dict
def chunk_pdf(pdf_path: str, chunk_size: int = 500) -> List[Dict]:
"""Extract and chunk PDF content with page awareness."""
doc = fitz.open(pdf_path)
chunks = []
for page_num, page in enumerate(doc, 1):
text = page.get_text()
# Skip empty pages
if not text.strip():
continue
# Split into paragraphs
paragraphs = text.split('\n\n')
current_chunk = ""
for para in paragraphs:
para = para.strip()
if not para:
continue
if len(current_chunk) + len(para) < chunk_size:
current_chunk += " " + para
else:
if current_chunk:
chunks.append({
"content": current_chunk.strip(),
"metadata": {
"type": "pdf",
"source": pdf_path,
"page": page_num,
"total_pages": len(doc)
}
})
current_chunk = para
# Don't forget last chunk of page
if current_chunk:
chunks.append({
"content": current_chunk.strip(),
"metadata": {
"type": "pdf",
"source": pdf_path,
"page": page_num,
"total_pages": len(doc)
}
})
return chunks
def extract_pdf_tables(pdf_path: str) -> List[Dict]:
"""Extract tables from PDF as separate chunks."""
import pdfplumber
tables = []
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, 1):
for table_num, table in enumerate(page.extract_tables(), 1):
# Convert table to markdown format
if table:
headers = table[0]
rows = table[1:]
md_table = "| " + " | ".join(str(h) for h in headers) + " |\n"
md_table += "| " + " | ".join("---" for _ in headers) + " |\n"
for row in rows:
md_table += "| " + " | ".join(str(c) for c in row) + " |\n"
tables.append({
"content": md_table,
"metadata": {
"type": "pdf_table",
"source": pdf_path,
"page": page_num,
"table_number": table_num
}
})
return tables元数据模式:
yaml
type: pdf | pdf_table
source: file path
page: page number
total_pages: document length
table_number: (for tables) which table on pagePattern 4: Websites / Web Content
模式4:网站/网页内容
When to use: Documentation sites, articles, blog posts
Chunking Strategy: Clean HTML, respect structure, handle navigation
python
import httpx
from bs4 import BeautifulSoup
from typing import List, Dict
from urllib.parse import urljoin, urlparse
def chunk_webpage(url: str) -> List[Dict]:
"""Fetch and chunk a webpage."""
response = httpx.get(url, follow_redirects=True)
soup = BeautifulSoup(response.text, 'html.parser')
# Remove noise
for tag in soup.find_all(['nav', 'footer', 'aside', 'script', 'style']):
tag.decompose()
chunks = []
# Find main content
main = soup.find('main') or soup.find('article') or soup.find('body')
# Chunk by sections
for section in main.find_all(['section', 'div'], class_=lambda x: x and 'content' in str(x).lower()):
text = section.get_text(separator=' ', strip=True)
if len(text) > 100: # Skip tiny sections
chunks.append({
"content": text,
"metadata": {
"type": "webpage",
"source": url,
"domain": urlparse(url).netloc,
"title": soup.title.string if soup.title else ""
}
})
# If no sections found, chunk the whole page
if not chunks:
text = main.get_text(separator=' ', strip=True)
# Split into ~500 word chunks
words = text.split()
for i in range(0, len(words), 450):
chunk_text = ' '.join(words[i:i+500])
chunks.append({
"content": chunk_text,
"metadata": {
"type": "webpage",
"source": url,
"domain": urlparse(url).netloc,
"title": soup.title.string if soup.title else ""
}
})
return chunks
async def crawl_site(start_url: str, max_pages: int = 50) -> List[Dict]:
"""Crawl a site and chunk all pages."""
from urllib.parse import urlparse
base_domain = urlparse(start_url).netloc
visited = set()
to_visit = [start_url]
all_chunks = []
async with httpx.AsyncClient() as client:
while to_visit and len(visited) < max_pages:
url = to_visit.pop(0)
if url in visited:
continue
try:
response = await client.get(url, follow_redirects=True)
visited.add(url)
# Chunk this page
all_chunks.extend(chunk_webpage(url))
# Find links to follow
soup = BeautifulSoup(response.text, 'html.parser')
for link in soup.find_all('a', href=True):
href = urljoin(url, link['href'])
if urlparse(href).netloc == base_domain and href not in visited:
to_visit.append(href)
except Exception as e:
print(f"Failed to fetch {url}: {e}")
return all_chunksMetadata Schema:
yaml
type: webpage
source: full URL
domain: domain name
title: page title
crawl_depth: (for crawls) how many links from start适用场景:文档站点、文章、博客帖子
分块策略:清洗HTML、保留结构、处理导航栏内容
python
import httpx
from bs4 import BeautifulSoup
from typing import List, Dict
from urllib.parse import urljoin, urlparse
def chunk_webpage(url: str) -> List[Dict]:
"""Fetch and chunk a webpage."""
response = httpx.get(url, follow_redirects=True)
soup = BeautifulSoup(response.text, 'html.parser')
# Remove noise
for tag in soup.find_all(['nav', 'footer', 'aside', 'script', 'style']):
tag.decompose()
chunks = []
# Find main content
main = soup.find('main') or soup.find('article') or soup.find('body')
# Chunk by sections
for section in main.find_all(['section', 'div'], class_=lambda x: x and 'content' in str(x).lower()):
text = section.get_text(separator=' ', strip=True)
if len(text) > 100: # Skip tiny sections
chunks.append({
"content": text,
"metadata": {
"type": "webpage",
"source": url,
"domain": urlparse(url).netloc,
"title": soup.title.string if soup.title else ""
}
})
# If no sections found, chunk the whole page
if not chunks:
text = main.get_text(separator=' ', strip=True)
# Split into ~500 word chunks
words = text.split()
for i in range(0, len(words), 450):
chunk_text = ' '.join(words[i:i+500])
chunks.append({
"content": chunk_text,
"metadata": {
"type": "webpage",
"source": url,
"domain": urlparse(url).netloc,
"title": soup.title.string if soup.title else ""
}
})
return chunks
async def crawl_site(start_url: str, max_pages: int = 50) -> List[Dict]:
"""Crawl a site and chunk all pages."""
from urllib.parse import urlparse
base_domain = urlparse(start_url).netloc
visited = set()
to_visit = [start_url]
all_chunks = []
async with httpx.AsyncClient() as client:
while to_visit and len(visited) < max_pages:
url = to_visit.pop(0)
if url in visited:
continue
try:
response = await client.get(url, follow_redirects=True)
visited.add(url)
# Chunk this page
all_chunks.extend(chunk_webpage(url))
# Find links to follow
soup = BeautifulSoup(response.text, 'html.parser')
for link in soup.find_all('a', href=True):
href = urljoin(url, link['href'])
if urlparse(href).netloc == base_domain and href not in visited:
to_visit.append(href)
except Exception as e:
print(f"Failed to fetch {url}: {e}")
return all_chunks元数据模式:
yaml
type: webpage
source: full URL
domain: domain name
title: page title
crawl_depth: (for crawls) how many links from startPattern 6: Research Notes
模式6:研究笔记
When to use: Personal notes, research findings, learnings
Chunking Strategy: By paragraph with topic extraction
python
from typing import List, Dict
from datetime import datetime
def chunk_research_notes(content: str, topic: str = None) -> List[Dict]:
"""Chunk research notes with topic awareness."""
# Split by double newlines (paragraphs)
paragraphs = [p.strip() for p in content.split('
') if p.strip()]
chunks = []
current_topic = topic or "general"
for para in paragraphs:
# Check if this is a topic header
if para.startswith('#') or (len(para) < 50 and para.endswith(':')):
current_topic = para.strip('#: ')
continue
chunks.append({
"content": para,
"metadata": {
"type": "research",
"topic": current_topic,
"ingested_at": datetime.now().isoformat(),
"word_count": len(para.split())
}
})
return chunks
def chunk_with_source_attribution(
content: str,
source_url: str = None,
source_title: str = None,
researcher: str = None
) -> List[Dict]:
"""Chunk research with full source attribution."""
chunks = chunk_research_notes(content)
for chunk in chunks:
chunk["metadata"].update({
"source_url": source_url,
"source_title": source_title,
"researcher": researcher
})
return chunksMetadata Schema:
yaml
type: research
topic: extracted or assigned topic
source_url: where the info came from
source_title: title of source
researcher: who did the research
ingested_at: timestamp
word_count: chunk size适用场景:个人笔记、研究发现、学习记录
分块策略:按段落拆分并提取主题
python
from typing import List, Dict
from datetime import datetime
def chunk_research_notes(content: str, topic: str = None) -> List[Dict]:
"""Chunk research notes with topic awareness."""
# Split by double newlines (paragraphs)
paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]
chunks = []
current_topic = topic or "general"
for para in paragraphs:
# Check if this is a topic header
if para.startswith('#') or (len(para) < 50 and para.endswith(':')):
current_topic = para.strip('#: ')
continue
chunks.append({
"content": para,
"metadata": {
"type": "research",
"topic": current_topic,
"ingested_at": datetime.now().isoformat(),
"word_count": len(para.split())
}
})
return chunks
def chunk_with_source_attribution(
content: str,
source_url: str = None,
source_title: str = None,
researcher: str = None
) -> List[Dict]:
"""Chunk research with full source attribution."""
chunks = chunk_research_notes(content)
for chunk in chunks:
chunk["metadata"].update({
"source_url": source_url,
"source_title": source_title,
"researcher": researcher
})
return chunks元数据模式:
yaml
type: research
topic: extracted or assigned topic
source_url: where the info came from
source_title: title of source
researcher: who did the research
ingested_at: timestamp
word_count: chunk size