Loading...
Loading...
Data pipelines, feature stores, and embedding generation for AI/ML systems. Use when building RAG pipelines, ML feature serving, or data transformations. Covers feature stores (Feast, Tecton), embedding pipelines, chunking strategies, orchestration (Dagster, Prefect, Airflow), dbt transformations, data versioning (LakeFS), and experiment tracking (MLflow, W&B).
npx skill4agent add ancoleman/ai-design-components ai-data-engineering┌─────────────────────────────────────────────────────────────┐
│ RAG Pipeline (5 Stages) │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. INGESTION → Load documents (PDF, DOCX, Markdown) │
│ 2. INDEXING → Chunk (512 tokens) + Embed + Store │
│ 3. RETRIEVAL → Query embedding + Vector search + Filters │
│ 4. GENERATION → Context injection + LLM streaming │
│ 5. EVALUATION → RAGAS metrics (faithfulness, relevancy) │
│ │
└─────────────────────────────────────────────────────────────┘references/rag-architecture.mdexamples/langchain-rag/basic_rag.py# Code-aware chunking (preserves functions/classes)
from langchain.text_splitter import RecursiveCharacterTextSplitter
code_splitter = RecursiveCharacterTextSplitter.from_language(
language="python",
chunk_size=512,
chunk_overlap=50
)
# Semantic chunking (splits on meaning, not tokens)
from langchain.text_splitter import SemanticChunker
semantic_splitter = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile" # Split at semantic boundaries
)references/chunking-strategies.mdfrom langchain_voyageai import VoyageAIEmbeddings
from langchain_openai import OpenAIEmbeddings
# Production (best quality)
embeddings = VoyageAIEmbeddings(
model="voyage-3",
voyage_api_key="your-api-key"
)
# Development (cost-effective)
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key="your-api-key"
)references/embedding-strategies.md| Metric | Measures | Good Score |
|---|---|---|
| Faithfulness | Factual consistency with retrieved context | > 0.8 |
| Answer Relevancy | Does answer address the user's question? | > 0.7 |
| Context Precision | Are retrieved chunks actually relevant? | > 0.6 |
| Context Recall | Were all necessary chunks retrieved? | > 0.7 |
# Run RAGAS evaluation (TOKEN-FREE script execution)
python scripts/evaluate_rag.py --dataset eval_data.json --output results.jsonfrom ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy
dataset = {
"question": ["What is the capital of France?"],
"answer": ["Paris is the capital of France."],
"contexts": [["France's capital is Paris."]],
"ground_truth": ["Paris"]
}
result = evaluate(dataset, metrics=[faithfulness, answer_relevancy])
print(f"Faithfulness: {result['faithfulness']}")
print(f"Answer Relevancy: {result['answer_relevancy']}")references/evaluation-metrics.mdfrom feast import FeatureStore
store = FeatureStore(repo_path="feature_repo/")
# Online serving (low-latency)
features = store.get_online_features(
features=["user_features:total_orders"],
entity_rows=[{"user_id": 1001}]
).to_dict()references/feature-stores.md/websites/langchain_oss_python_langchainfrom langchain_core.prompts import ChatPromptTemplate
from langchain_qdrant import QdrantVectorStore
from langchain_voyageai import VoyageAIEmbeddings
# Setup retriever
vectorstore = QdrantVectorStore(
client=qdrant_client,
embedding=VoyageAIEmbeddings(model="voyage-3")
)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": 5})
# Build chain
prompt = ChatPromptTemplate.from_template(
"Answer based on context:\n{context}\n\nQuestion: {question}"
)
chain = {"context": retriever, "question": lambda x: x} | prompt | ChatOpenAI() | StrOutputParser()
# Stream response
for chunk in chain.stream("What is the capital of France?"):
print(chunk, end="", flush=True)references/langchain-patterns.mdfrom dagster import asset
from langchain_voyageai import VoyageAIEmbeddings
@asset
def raw_documents():
"""Load documents from S3."""
return documents
@asset
def chunked_documents(raw_documents):
"""Split into 512-token chunks with 50-token overlap."""
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=50)
return splitter.split_documents(raw_documents)
@asset
def embedded_documents(chunked_documents):
"""Generate embeddings with Voyage AI."""
embeddings = VoyageAIEmbeddings(model="voyage-3")
return embeddings.embed_documents([doc.page_content for doc in chunked_documents])references/orchestration-tools.mdfrom fastapi import FastAPI
from fastapi.responses import StreamingResponse
@app.post("/api/rag/stream")
async def stream_rag(query: str):
async def generate():
chain = RetrievalQA.from_chain_type(llm=OpenAI(streaming=True), retriever=vectorstore.as_retriever())
async for chunk in chain.astream(query):
yield chunk
return StreamingResponse(generate(), media_type="text/plain")references/rag-architecture.mdfrom qdrant_client import QdrantClient
from langchain_voyageai import VoyageAIEmbeddings
@app.post("/api/search/semantic")
async def semantic_search(query: str, filters: dict):
query_vector = VoyageAIEmbeddings(model="voyage-3").embed_query(query)
results = QdrantClient().search(
collection_name="documents",
query_vector=query_vector,
query_filter=filters,
limit=10
)
return {"results": results}import lakefs
branch = lakefs.Branch("main").create("experiment-voyage-3")
branch.commit("Updated embeddings to voyage-3")
branch.merge_into("main")references/data-versioning.md# Run Qdrant setup script (TOKEN-FREE execution)
python scripts/setup_qdrant.py --collection docs --dimension 1024# Chunk documents (TOKEN-FREE execution)
python scripts/chunk_documents.py \
--input data/documents/ \
--chunk-size 512 \
--overlap 50 \
--output data/chunks/examples/langchain-rag/basic_rag.py# Run evaluation (TOKEN-FREE execution)
python scripts/evaluate_rag.py \
--dataset data/eval_qa.json \
--output results/ragas_metrics.jsonexamples/dagster-pipelines/embedding_pipeline.py# Core RAG
pip install langchain langchain-core langchain-openai langchain-voyageai langchain-qdrant
# Vector database
pip install qdrant-client
# Evaluation
pip install ragas datasets
# Feature stores
pip install feast
# Orchestration
pip install dagster dagster-webserver
# Data versioning
pip install lakefs-client# LlamaIndex (alternative to LangChain)
pip install llama-index
# dbt (SQL transformations)
pip install dbt-core dbt-postgres
# Prefect (alternative orchestration)
pip install prefectreferences/rag-architecture.mdreferences/rag-architecture.mdreferences/chunking-strategies.mdreferences/embedding-strategies.mdreferences/langchain-patterns.mdreferences/feature-stores.mdreferences/evaluation-metrics.mdexamples/langchain-rag/basic_rag.pyexamples/langchain-rag/streaming_rag.pyexamples/langchain-rag/hybrid_search.pyexamples/llamaindex-agents/query_engine.pyexamples/feast-features/examples/dagster-pipelines/embedding_pipeline.pyscripts/evaluate_rag.pyscripts/chunk_documents.pyscripts/benchmark_retrieval.pyscripts/setup_qdrant.py