Loading...
Loading...
Compare original and translation side by side
undefinedundefinedundefinedundefinedDocument Folder
│
▼
┌─────────────────────┐
│ 1. Build Inventory │ SQLite catalog of all files
└──────────┬──────────┘
▼
┌─────────────────────┐
│ 2. Extract Text │ PyMuPDF for regular PDFs
└──────────┬──────────┘
▼
┌─────────────────────┐
│ 3. OCR Scanned PDFs │ Tesseract + pytesseract
└──────────┬──────────┘
▼
┌─────────────────────┐
│ 4. Chunk Text │ 1000 chars, 200 overlap
└──────────┬──────────┘
▼
┌─────────────────────┐
│ 5. Generate Embeds │ sentence-transformers
└──────────┬──────────┘
▼
┌─────────────────────┐
│ 6. Semantic Search │ Cosine similarity
└─────────────────────┘Document Folder
│
▼
┌─────────────────────┐
│ 1. Build Inventory │ SQLite catalog of all files
└──────────┬──────────┘
▼
┌─────────────────────┐
│ 2. Extract Text │ PyMuPDF for regular PDFs
└──────────┬──────────┘
▼
┌─────────────────────┐
│ 3. OCR Scanned PDFs │ Tesseract + pytesseract
└──────────┬──────────┘
▼
┌─────────────────────┐
│ 4. Chunk Text │ 1000 chars, 200 overlap
└──────────┬──────────┘
▼
┌─────────────────────┐
│ 5. Generate Embeds │ sentence-transformers
└──────────┬──────────┘
▼
┌─────────────────────┐
│ 6. Semantic Search │ Cosine similarity
└─────────────────────┘undefinedundefinedundefinedundefinedpip install PyMuPDF pytesseract Pillow sentence-transformers numpy tqdmuv pip install PyMuPDF pytesseract Pillow sentence-transformers numpy tqdmpip install PyMuPDF pytesseract Pillow sentence-transformers numpy tqdmuv pip install PyMuPDF pytesseract Pillow sentence-transformers numpy tqdmimport sqlite3
from pathlib import Path
from datetime import datetime
def create_database(db_path):
"""Create SQLite database with full schema."""
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()
# Documents table
cursor.execute('''
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
filepath TEXT UNIQUE NOT NULL,
file_size INTEGER,
file_type TEXT,
page_count INTEGER,
extraction_method TEXT, -- 'text', 'ocr', 'failed', 'drm_protected'
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Text chunks table
cursor.execute('''
CREATE TABLE IF NOT EXISTS text_chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id INTEGER NOT NULL,
chunk_num INTEGER NOT NULL,
chunk_text TEXT NOT NULL,
char_count INTEGER,
embedding BLOB,
embedding_model TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents(id),
UNIQUE(document_id, chunk_num)
)
''')
# Create indexes
cursor.execute('CREATE INDEX IF NOT EXISTS idx_chunks_doc_id ON text_chunks(document_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_docs_filepath ON documents(filepath)')
conn.commit()
return connimport sqlite3
from pathlib import Path
from datetime import datetime
def create_database(db_path):
"""Create SQLite database with full schema."""
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()
# Documents table
cursor.execute('''
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
filepath TEXT UNIQUE NOT NULL,
file_size INTEGER,
file_type TEXT,
page_count INTEGER,
extraction_method TEXT, -- 'text', 'ocr', 'failed', 'drm_protected'
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Text chunks table
cursor.execute('''
CREATE TABLE IF NOT EXISTS text_chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id INTEGER NOT NULL,
chunk_num INTEGER NOT NULL,
chunk_text TEXT NOT NULL,
char_count INTEGER,
embedding BLOB,
embedding_model TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents(id),
UNIQUE(document_id, chunk_num)
)
''')
# Create indexes
cursor.execute('CREATE INDEX IF NOT EXISTS idx_chunks_doc_id ON text_chunks(document_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_docs_filepath ON documents(filepath)')
conn.commit()
return connimport fitz # PyMuPDF
def extract_pdf_text(pdf_path):
"""Extract text from PDF using PyMuPDF."""
try:
doc = fitz.open(pdf_path)
text_parts = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
if text.strip():
text_parts.append(text)
doc.close()
full_text = "\n".join(text_parts)
# Check if meaningful text extracted
if len(full_text.strip()) < 100:
return None, "no_text"
return full_text, "text"
except Exception as e:
if "encrypted" in str(e).lower() or "drm" in str(e).lower():
return None, "drm_protected"
return None, f"error: {str(e)}"import fitz # PyMuPDF
def extract_pdf_text(pdf_path):
"""Extract text from PDF using PyMuPDF."""
try:
doc = fitz.open(pdf_path)
text_parts = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
if text.strip():
text_parts.append(text)
doc.close()
full_text = "\n".join(text_parts)
# Check if meaningful text extracted
if len(full_text.strip()) < 100:
return None, "no_text"
return full_text, "text"
except Exception as e:
if "encrypted" in str(e).lower() or "drm" in str(e).lower():
return None, "drm_protected"
return None, f"error: {str(e)}"import fitz
import pytesseract
from PIL import Image
import io
def ocr_pdf(pdf_path, dpi=200):
"""OCR scanned PDF using Tesseract."""
try:
doc = fitz.open(pdf_path)
text_parts = []
for page_num in range(len(doc)):
page = doc[page_num]
# Convert page to image
mat = fitz.Matrix(dpi/72, dpi/72)
pix = page.get_pixmap(matrix=mat)
# Convert to PIL Image
img_data = pix.tobytes("png")
img = Image.open(io.BytesIO(img_data))
# OCR with Tesseract
text = pytesseract.image_to_string(img, lang='eng')
if text.strip():
text_parts.append(text)
doc.close()
full_text = "\n".join(text_parts)
if len(full_text.strip()) < 100:
return None, "ocr_failed"
return full_text, "ocr"
except Exception as e:
return None, f"ocr_error: {str(e)}"import fitz
import pytesseract
from PIL import Image
import io
def ocr_pdf(pdf_path, dpi=200):
"""OCR scanned PDF using Tesseract."""
try:
doc = fitz.open(pdf_path)
text_parts = []
for page_num in range(len(doc)):
page = doc[page_num]
# Convert page to image
mat = fitz.Matrix(dpi/72, dpi/72)
pix = page.get_pixmap(matrix=mat)
# Convert to PIL Image
img_data = pix.tobytes("png")
img = Image.open(io.BytesIO(img_data))
# OCR with Tesseract
text = pytesseract.image_to_string(img, lang='eng')
if text.strip():
text_parts.append(text)
doc.close()
full_text = "\n".join(text_parts)
if len(full_text.strip()) < 100:
return None, "ocr_failed"
return full_text, "ocr"
except Exception as e:
return None, f"ocr_error: {str(e)}"def chunk_text(text, chunk_size=1000, overlap=200):
"""Split text into overlapping chunks."""
chunks = []
start = 0
text_len = len(text)
while start < text_len:
end = start + chunk_size
chunk = text[start:end]
# Try to break at sentence boundary
if end < text_len:
last_period = chunk.rfind('.')
last_newline = chunk.rfind('\n')
break_point = max(last_period, last_newline)
if break_point > chunk_size * 0.7:
chunk = text[start:start + break_point + 1]
end = start + break_point + 1
chunks.append(chunk.strip())
start = end - overlap
if start >= text_len:
break
return chunksdef chunk_text(text, chunk_size=1000, overlap=200):
"""Split text into overlapping chunks."""
chunks = []
start = 0
text_len = len(text)
while start < text_len:
end = start + chunk_size
chunk = text[start:end]
# Try to break at sentence boundary
if end < text_len:
last_period = chunk.rfind('.')
last_newline = chunk.rfind('\n')
break_point = max(last_period, last_newline)
if break_point > chunk_size * 0.7:
chunk = text[start:start + break_point + 1]
end = start + break_point + 1
chunks.append(chunk.strip())
start = end - overlap
if start >= text_len:
break
return chunksfrom sentence_transformers import SentenceTransformer
import numpy as np
import pickle
import osfrom sentence_transformers import SentenceTransformer
import numpy as np
import pickle
import osmodel = SentenceTransformer(model_name)
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()
# Get chunks needing embeddings
cursor.execute('''
SELECT id, chunk_text FROM text_chunks
WHERE embedding IS NULL
''')
chunks = cursor.fetchall()
print(f"Generating embeddings for {len(chunks)} chunks...")
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i+batch_size]
ids = [c[0] for c in batch]
texts = [c[1] for c in batch]
# Generate embeddings
embeddings = model.encode(texts, normalize_embeddings=True)
# Store as pickled numpy arrays
for chunk_id, emb in zip(ids, embeddings):
emb_blob = pickle.dumps(emb.astype(np.float32))
cursor.execute('''
UPDATE text_chunks
SET embedding = ?, embedding_model = ?
WHERE id = ?
''', (emb_blob, model_name, chunk_id))
conn.commit()
print(f" Embedded {min(i+batch_size, len(chunks))}/{len(chunks)}")
conn.close()
print("Embedding complete!")undefinedmodel = SentenceTransformer(model_name)
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()
# Get chunks needing embeddings
cursor.execute('''
SELECT id, chunk_text FROM text_chunks
WHERE embedding IS NULL
''')
chunks = cursor.fetchall()
print(f"Generating embeddings for {len(chunks)} chunks...")
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i+batch_size]
ids = [c[0] for c in batch]
texts = [c[1] for c in batch]
# Generate embeddings
embeddings = model.encode(texts, normalize_embeddings=True)
# Store as pickled numpy arrays
for chunk_id, emb in zip(ids, embeddings):
emb_blob = pickle.dumps(emb.astype(np.float32))
cursor.execute('''
UPDATE text_chunks
SET embedding = ?, embedding_model = ?
WHERE id = ?
''', (emb_blob, model_name, chunk_id))
conn.commit()
print(f" Embedded {min(i+batch_size, len(chunks))}/{len(chunks)}")
conn.close()
print("Embedding complete!")undefineddef semantic_search(db_path, query, top_k=10, sample_size=50000):
"""Search for similar chunks using cosine similarity."""
# Force CPU mode
os.environ["CUDA_VISIBLE_DEVICES"] = ""
model = SentenceTransformer('all-MiniLM-L6-v2')
query_emb = model.encode(query, normalize_embeddings=True)
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()
# Get chunks with embeddings (sample if large)
cursor.execute('SELECT COUNT(*) FROM text_chunks WHERE embedding IS NOT NULL')
total = cursor.fetchone()[0]
if total > sample_size:
# Random sample for large databases
cursor.execute(f'''
SELECT tc.id, tc.chunk_text, tc.embedding, d.filename
FROM text_chunks tc
JOIN documents d ON tc.document_id = d.id
WHERE tc.embedding IS NOT NULL
ORDER BY RANDOM()
LIMIT {sample_size}
''')
else:
cursor.execute('''
SELECT tc.id, tc.chunk_text, tc.embedding, d.filename
FROM text_chunks tc
JOIN documents d ON tc.document_id = d.id
WHERE tc.embedding IS NOT NULL
''')
results = []
for chunk_id, text, emb_blob, filename in cursor.fetchall():
emb = pickle.loads(emb_blob)
# Cosine similarity (embeddings are normalized)
similarity = np.dot(query_emb, emb)
results.append({
'id': chunk_id,
'text': text[:500], # Truncate for display
'filename': filename,
'score': float(similarity)
})
conn.close()
# Sort by similarity
results.sort(key=lambda x: x['score'], reverse=True)
return results[:top_k]def semantic_search(db_path, query, top_k=10, sample_size=50000):
"""Search for similar chunks using cosine similarity."""
# Force CPU mode
os.environ["CUDA_VISIBLE_DEVICES"] = ""
model = SentenceTransformer('all-MiniLM-L6-v2')
query_emb = model.encode(query, normalize_embeddings=True)
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()
# Get chunks with embeddings (sample if large)
cursor.execute('SELECT COUNT(*) FROM text_chunks WHERE embedding IS NOT NULL')
total = cursor.fetchone()[0]
if total > sample_size:
# Random sample for large databases
cursor.execute(f'''
SELECT tc.id, tc.chunk_text, tc.embedding, d.filename
FROM text_chunks tc
JOIN documents d ON tc.document_id = d.id
WHERE tc.embedding IS NOT NULL
ORDER BY RANDOM()
LIMIT {sample_size}
''')
else:
cursor.execute('''
SELECT tc.id, tc.chunk_text, tc.embedding, d.filename
FROM text_chunks tc
JOIN documents d ON tc.document_id = d.id
WHERE tc.embedding IS NOT NULL
''')
results = []
for chunk_id, text, emb_blob, filename in cursor.fetchall():
emb = pickle.loads(emb_blob)
# Cosine similarity (embeddings are normalized)
similarity = np.dot(query_emb, emb)
results.append({
'id': chunk_id,
'text': text[:500], # Truncate for display
'filename': filename,
'score': float(similarity)
})
conn.close()
# Sort by similarity
results.sort(key=lambda x: x['score'], reverse=True)
return results[:top_k]#!/usr/bin/env python3
"""
Document RAG Pipeline - Build searchable knowledge base from PDF folder.
Usage:
python build_knowledge_base.py /path/to/documents --db inventory.db
python build_knowledge_base.py /path/to/documents --search "query text"
"""
import argparse
import os
from pathlib import Path
from tqdm import tqdm
def build_inventory(folder_path, db_path):
"""Build document inventory from folder."""
conn = create_database(db_path)
cursor = conn.cursor()
pdf_files = list(Path(folder_path).rglob("*.pdf"))
print(f"Found {len(pdf_files)} PDF files")
for pdf_path in tqdm(pdf_files, desc="Building inventory"):
# Check if already processed
cursor.execute('SELECT id FROM documents WHERE filepath = ?',
(str(pdf_path),))
if cursor.fetchone():
continue
file_size = pdf_path.stat().st_size
cursor.execute('''
INSERT INTO documents (filename, filepath, file_size, file_type)
VALUES (?, ?, ?, 'pdf')
''', (pdf_path.name, str(pdf_path), file_size))
conn.commit()
conn.close()
def process_documents(db_path, use_ocr=True):
"""Extract text from all unprocessed documents."""
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()
# Get unprocessed documents
cursor.execute('''
SELECT id, filepath FROM documents
WHERE extraction_method IS NULL
''')
docs = cursor.fetchall()
stats = {'text': 0, 'ocr': 0, 'failed': 0, 'drm': 0}
for doc_id, filepath in tqdm(docs, desc="Extracting text"):
# Try regular extraction first
text, method = extract_pdf_text(filepath)
# Try OCR if no text and OCR enabled
if text is None and use_ocr and method == "no_text":
text, method = ocr_pdf(filepath)
if text:
# Chunk and store
chunks = chunk_text(text)
for i, chunk in enumerate(chunks):
cursor.execute('''
INSERT OR IGNORE INTO text_chunks
(document_id, chunk_num, chunk_text, char_count)
VALUES (?, ?, ?, ?)
''', (doc_id, i, chunk, len(chunk)))
stats['text' if method == 'text' else 'ocr'] += 1
else:
if 'drm' in method:
stats['drm'] += 1
else:
stats['failed'] += 1
# Update document status
cursor.execute('''
UPDATE documents SET extraction_method = ? WHERE id = ?
''', (method, doc_id))
conn.commit()
conn.close()
return stats
def main():
parser = argparse.ArgumentParser(description='Document RAG Pipeline')
parser.add_argument('folder', help='Folder containing documents')
parser.add_argument('--db', default='_inventory.db', help='Database path')
parser.add_argument('--no-ocr', action='store_true', help='Skip OCR')
parser.add_argument('--embed', action='store_true', help='Generate embeddings')
parser.add_argument('--search', help='Search query')
parser.add_argument('--top-k', type=int, default=10, help='Number of results')
args = parser.parse_args()
db_path = Path(args.folder) / args.db
if args.search:
# Search mode
results = semantic_search(str(db_path), args.search, args.top_k)
print(f"\nTop {len(results)} results for: '{args.search}'\n")
for i, r in enumerate(results, 1):
print(f"{i}. [{r['score']:.3f}] {r['filename']}")
print(f" {r['text'][:200]}...\n")
else:
# Build mode
print("Step 1: Building inventory...")
build_inventory(args.folder, str(db_path))
print("\nStep 2: Extracting text...")
stats = process_documents(str(db_path), use_ocr=not args.no_ocr)
print(f"Results: {stats}")
if args.embed:
print("\nStep 3: Generating embeddings...")
create_embeddings(str(db_path))
if __name__ == '__main__':
main()#!/usr/bin/env python3
"""
Document RAG Pipeline - Build searchable knowledge base from PDF folder.
Usage:
python build_knowledge_base.py /path/to/documents --db inventory.db
python build_knowledge_base.py /path/to/documents --search "query text"
"""
import argparse
import os
from pathlib import Path
from tqdm import tqdm
def build_inventory(folder_path, db_path):
"""Build document inventory from folder."""
conn = create_database(db_path)
cursor = conn.cursor()
pdf_files = list(Path(folder_path).rglob("*.pdf"))
print(f"Found {len(pdf_files)} PDF files")
for pdf_path in tqdm(pdf_files, desc="Building inventory"):
# Check if already processed
cursor.execute('SELECT id FROM documents WHERE filepath = ?',
(str(pdf_path),))
if cursor.fetchone():
continue
file_size = pdf_path.stat().st_size
cursor.execute('''
INSERT INTO documents (filename, filepath, file_size, file_type)
VALUES (?, ?, ?, 'pdf')
''', (pdf_path.name, str(pdf_path), file_size))
conn.commit()
conn.close()
def process_documents(db_path, use_ocr=True):
"""Extract text from all unprocessed documents."""
conn = sqlite3.connect(db_path, timeout=30)
cursor = conn.cursor()
# Get unprocessed documents
cursor.execute('''
SELECT id, filepath FROM documents
WHERE extraction_method IS NULL
''')
docs = cursor.fetchall()
stats = {'text': 0, 'ocr': 0, 'failed': 0, 'drm': 0}
for doc_id, filepath in tqdm(docs, desc="Extracting text"):
# Try regular extraction first
text, method = extract_pdf_text(filepath)
# Try OCR if no text and OCR enabled
if text is None and use_ocr and method == "no_text":
text, method = ocr_pdf(filepath)
if text:
# Chunk and store
chunks = chunk_text(text)
for i, chunk in enumerate(chunks):
cursor.execute('''
INSERT OR IGNORE INTO text_chunks
(document_id, chunk_num, chunk_text, char_count)
VALUES (?, ?, ?, ?)
''', (doc_id, i, chunk, len(chunk)))
stats['text' if method == 'text' else 'ocr'] += 1
else:
if 'drm' in method:
stats['drm'] += 1
else:
stats['failed'] += 1
# Update document status
cursor.execute('''
UPDATE documents SET extraction_method = ? WHERE id = ?
''', (method, doc_id))
conn.commit()
conn.close()
return stats
def main():
parser = argparse.ArgumentParser(description='Document RAG Pipeline')
parser.add_argument('folder', help='Folder containing documents')
parser.add_argument('--db', default='_inventory.db', help='Database path')
parser.add_argument('--no-ocr', action='store_true', help='Skip OCR')
parser.add_argument('--embed', action='store_true', help='Generate embeddings')
parser.add_argument('--search', help='Search query')
parser.add_argument('--top-k', type=int, default=10, help='Number of results')
args = parser.parse_args()
db_path = Path(args.folder) / args.db
if args.search:
# Search mode
results = semantic_search(str(db_path), args.search, args.top_k)
print(f"\nTop {len(results)} results for: '{args.search}'\n")
for i, r in enumerate(results, 1):
print(f"{i}. [{r['score']:.3f}] {r['filename']}")
print(f" {r['text'][:200]}...\n")
else:
# Build mode
print("Step 1: Building inventory...")
build_inventory(args.folder, str(db_path))
print("\nStep 2: Extracting text...")
stats = process_documents(str(db_path), use_ocr=not args.no_ocr)
print(f"Results: {stats}")
if args.embed:
print("\nStep 3: Generating embeddings...")
create_embeddings(str(db_path))
if __name__ == '__main__':
main()undefinedundefinedundefinedundefinedundefinedundefinedundefinedundefined#!/bin/bash#!/bin/bashundefinedundefinedCUDA_VISIBLE_DEVICES=""apt-get install tesseract-ocrbrew install tesseractextraction_method = 'drm_protected'timeout=30CUDA_VISIBLE_DEVICES=""apt-get install tesseract-ocrbrew install tesseractextraction_method = 'drm_protected'timeout=30| Metric | Typical Value |
|---|---|
| Text extraction | ~50 pages/second |
| OCR processing | ~2-5 pages/minute |
| Embedding generation | ~100 chunks/second (CPU) |
| Search latency | <2 seconds (50K chunks) |
| Memory usage | ~2GB for embeddings |
| 指标 | 典型值 |
|---|---|
| 文本提取速度 | ~50页/秒 |
| OCR处理速度 | ~2-5页/分钟 |
| 嵌入生成速度 | ~100块/秒(CPU) |
| 搜索延迟 | <2秒(50K块) |
| 内存占用 | ~2GB(嵌入数据) |
| Metric | Value |
|---|---|
| Total documents | 957 |
| Text extraction | 811 PDFs |
| OCR processed | 96 PDFs |
| DRM protected | 50 PDFs |
| Total chunks | 1,043,616 |
| Embedding time | ~4 hours (CPU) |
| Search latency | <2 seconds |
| 指标 | 数值 |
|---|---|
| 总文档数 | 957 |
| 文本提取成功 | 811份PDF |
| OCR处理 | 96份PDF |
| DRM保护 | 50份PDF |
| 总分块数 | 1,043,616 |
| 嵌入生成时间 | ~4小时(CPU) |
| 搜索延迟 | <2秒 |
pdf-text-extractorsemantic-search-setuprag-system-builderknowledge-base-builderpdf-text-extractorsemantic-search-setuprag-system-builderknowledge-base-builder