ai-engineer-expert
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAI Engineer Expert
资深AI工程师
Expert guidance for implementing AI systems, LLM integration, prompt engineering, and deploying production AI applications.
为AI系统实现、LLM集成、提示词工程以及生产级AI应用部署提供专业指导。
Core Concepts
核心概念
AI Engineering
AI工程
- LLM integration and orchestration
- Prompt engineering and optimization
- RAG (Retrieval-Augmented Generation)
- Vector databases and embeddings
- Fine-tuning and adaptation
- AI agent systems
- LLM集成与编排
- 提示词工程与优化
- RAG(Retrieval-Augmented Generation)
- 向量数据库与嵌入
- 微调与适配
- AI agent系统
Production AI
生产级AI
- Model deployment strategies
- API design for AI services
- Rate limiting and cost control
- Error handling and fallbacks
- Monitoring and logging
- Security and safety
- 模型部署策略
- AI服务的API设计
- 限流与成本控制
- 错误处理与降级
- 监控与日志
- 安全与防护
LLM Patterns
LLM模式
- Chain-of-thought prompting
- Few-shot learning
- System/user message design
- Function calling and tools
- Streaming responses
- Context window management
- 思维链提示
- 少样本学习
- 系统/用户消息设计
- 函数调用与工具
- 流式响应
- 上下文窗口管理
LLM Integration
LLM集成
python
from openai import AsyncOpenAI
from anthropic import Anthropic
from typing import List, Dict, Optional
import asyncio
class LLMClient:
"""Unified LLM client with fallback"""
def __init__(self, primary: str = "openai", fallback: str = "anthropic"):
self.openai_client = AsyncOpenAI()
self.anthropic_client = Anthropic()
self.primary = primary
self.fallback = fallback
async def chat_completion(self, messages: List[Dict],
model: str = "gpt-4-turbo",
temperature: float = 0.7,
max_tokens: int = 1000) -> str:
"""Chat completion with fallback"""
try:
if self.primary == "openai":
response = await self.openai_client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.choices[0].message.content
except Exception as e:
print(f"Primary provider failed: {e}, trying fallback")
if self.fallback == "anthropic":
response = self.anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.content[0].text
async def chat_completion_streaming(self, messages: List[Dict],
model: str = "gpt-4-turbo"):
"""Streaming chat completion"""
stream = await self.openai_client.chat.completions.create(
model=model,
messages=messages,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def function_calling(self, messages: List[Dict],
tools: List[Dict]) -> Dict:
"""Function calling with tools"""
response = await self.openai_client.chat.completions.create(
model="gpt-4-turbo",
messages=messages,
tools=tools,
tool_choice="auto"
)
message = response.choices[0].message
if message.tool_calls:
return {
"type": "function_call",
"function": message.tool_calls[0].function.name,
"arguments": message.tool_calls[0].function.arguments
}
else:
return {
"type": "message",
"content": message.content
}python
from openai import AsyncOpenAI
from anthropic import Anthropic
from typing import List, Dict, Optional
import asyncio
class LLMClient:
"""Unified LLM client with fallback"""
def __init__(self, primary: str = "openai", fallback: str = "anthropic"):
self.openai_client = AsyncOpenAI()
self.anthropic_client = Anthropic()
self.primary = primary
self.fallback = fallback
async def chat_completion(self, messages: List[Dict],
model: str = "gpt-4-turbo",
temperature: float = 0.7,
max_tokens: int = 1000) -> str:
"""Chat completion with fallback"""
try:
if self.primary == "openai":
response = await self.openai_client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.choices[0].message.content
except Exception as e:
print(f"Primary provider failed: {e}, trying fallback")
if self.fallback == "anthropic":
response = self.anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return response.content[0].text
async def chat_completion_streaming(self, messages: List[Dict],
model: str = "gpt-4-turbo"):
"""Streaming chat completion"""
stream = await self.openai_client.chat.completions.create(
model=model,
messages=messages,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def function_calling(self, messages: List[Dict],
tools: List[Dict]) -> Dict:
"""Function calling with tools"""
response = await self.openai_client.chat.completions.create(
model="gpt-4-turbo",
messages=messages,
tools=tools,
tool_choice="auto"
)
message = response.choices[0].message
if message.tool_calls:
return {
"type": "function_call",
"function": message.tool_calls[0].function.name,
"arguments": message.tool_calls[0].function.arguments
}
else:
return {
"type": "message",
"content": message.content
}RAG Implementation
RAG实现
python
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI
class RAGSystem:
"""Retrieval-Augmented Generation system"""
def __init__(self, persist_directory: str = "./chroma_db"):
self.embeddings = OpenAIEmbeddings()
self.vectorstore = None
self.persist_directory = persist_directory
self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)
def ingest_documents(self, documents: List[str]):
"""Ingest and index documents"""
# Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = text_splitter.create_documents(documents)
# Create vector store
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
persist_directory=self.persist_directory
)
def query(self, question: str, k: int = 4) -> Dict:
"""Query with RAG"""
if not self.vectorstore:
raise ValueError("No documents ingested")
# Retrieve relevant documents
retriever = self.vectorstore.as_retriever(
search_kwargs={"k": k}
)
# Create QA chain
qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True
)
# Get answer
result = qa_chain({"query": question})
return {
"answer": result["result"],
"sources": [doc.page_content for doc in result["source_documents"]]
}
def similarity_search(self, query: str, k: int = 4) -> List[Dict]:
"""Similarity search in vector database"""
results = self.vectorstore.similarity_search_with_score(query, k=k)
return [
{
"content": doc.page_content,
"score": score,
"metadata": doc.metadata
}
for doc, score in results
]python
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI
class RAGSystem:
"""Retrieval-Augmented Generation system"""
def __init__(self, persist_directory: str = "./chroma_db"):
self.embeddings = OpenAIEmbeddings()
self.vectorstore = None
self.persist_directory = persist_directory
self.llm = ChatOpenAI(model="gpt-4-turbo", temperature=0)
def ingest_documents(self, documents: List[str]):
"""Ingest and index documents"""
# Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
chunks = text_splitter.create_documents(documents)
# Create vector store
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
persist_directory=self.persist_directory
)
def query(self, question: str, k: int = 4) -> Dict:
"""Query with RAG"""
if not self.vectorstore:
raise ValueError("No documents ingested")
# Retrieve relevant documents
retriever = self.vectorstore.as_retriever(
search_kwargs={"k": k}
)
# Create QA chain
qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True
)
# Get answer
result = qa_chain({"query": question})
return {
"answer": result["result"],
"sources": [doc.page_content for doc in result["source_documents"]]
}
def similarity_search(self, query: str, k: int = 4) -> List[Dict]:
"""Similarity search in vector database"""
results = self.vectorstore.similarity_search_with_score(query, k=k)
return [
{
"content": doc.page_content,
"score": score,
"metadata": doc.metadata
}
for doc, score in results
]Prompt Engineering
提示词工程
python
class PromptTemplate:
"""Advanced prompt templates"""
@staticmethod
def chain_of_thought(question: str) -> str:
"""Chain-of-thought prompting"""
return f"""Let's solve this step by step:
Question: {question}
Please think through this problem carefully:
1. First, identify what we need to find
2. Then, break down the problem into smaller steps
3. Solve each step
4. Finally, combine the results
Your step-by-step solution:"""
@staticmethod
def few_shot(task: str, examples: List[Dict], query: str) -> str:
"""Few-shot learning prompt"""
examples_text = "\n\n".join([
f"Input: {ex['input']}\nOutput: {ex['output']}"
for ex in examples
])
return f"""Task: {task}
Here are some examples:
{examples_text}
Now, please solve this:
Input: {query}
Output:"""
@staticmethod
def system_message(role: str, constraints: List[str],
format_instructions: str) -> str:
"""System message template"""
constraints_text = "\n".join([f"- {c}" for c in constraints])
return f"""You are a {role}.
Constraints:
{constraints_text}
Output Format:
{format_instructions}
Remember to follow these guidelines strictly."""python
class PromptTemplate:
"""Advanced prompt templates"""
@staticmethod
def chain_of_thought(question: str) -> str:
"""Chain-of-thought prompting"""
return f"""Let's solve this step by step:
Question: {question}
Please think through this problem carefully:
1. First, identify what we need to find
2. Then, break down the problem into smaller steps
3. Solve each step
4. Finally, combine the results
Your step-by-step solution:"""
@staticmethod
def few_shot(task: str, examples: List[Dict], query: str) -> str:
"""Few-shot learning prompt"""
examples_text = "\n\n".join([
f"Input: {ex['input']}\nOutput: {ex['output']}"
for ex in examples
])
return f"""Task: {task}
Here are some examples:
{examples_text}
Now, please solve this:
Input: {query}
Output:"""
@staticmethod
def system_message(role: str, constraints: List[str],
format_instructions: str) -> str:
"""System message template"""
constraints_text = "\n".join([f"- {c}" for c in constraints])
return f"""You are a {role}.
Constraints:
{constraints_text}
Output Format:
{format_instructions}
Remember to follow these guidelines strictly."""AI Agent System
AI Agent系统
python
from typing import Callable
import json
class Tool:
"""Tool that agents can use"""
def __init__(self, name: str, description: str, function: Callable):
self.name = name
self.description = description
self.function = function
def to_openai_function(self) -> Dict:
"""Convert to OpenAI function format"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.get_parameters()
}
}
class AIAgent:
"""AI agent with tools"""
def __init__(self, llm_client: LLMClient, tools: List[Tool]):
self.llm = llm_client
self.tools = {tool.name: tool for tool in tools}
self.conversation_history = []
async def run(self, user_input: str, max_iterations: int = 10) -> str:
"""Run agent with tool use"""
self.conversation_history.append({
"role": "user",
"content": user_input
})
for i in range(max_iterations):
# Get LLM response with function calling
response = await self.llm.function_calling(
messages=self.conversation_history,
tools=[tool.to_openai_function() for tool in self.tools.values()]
)
if response["type"] == "message":
# Agent is done
return response["content"]
# Execute tool
tool_name = response["function"]
arguments = json.loads(response["arguments"])
tool_result = await self.execute_tool(tool_name, arguments)
# Add tool result to conversation
self.conversation_history.append({
"role": "function",
"name": tool_name,
"content": str(tool_result)
})
return "Max iterations reached"
async def execute_tool(self, tool_name: str, arguments: Dict) -> any:
"""Execute a tool"""
if tool_name not in self.tools:
raise ValueError(f"Tool {tool_name} not found")
tool = self.tools[tool_name]
return await tool.function(**arguments)python
from typing import Callable
import json
class Tool:
"""Tool that agents can use"""
def __init__(self, name: str, description: str, function: Callable):
self.name = name
self.description = description
self.function = function
def to_openai_function(self) -> Dict:
"""Convert to OpenAI function format"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.get_parameters()
}
}
class AIAgent:
"""AI agent with tools"""
def __init__(self, llm_client: LLMClient, tools: List[Tool]):
self.llm = llm_client
self.tools = {tool.name: tool for tool in tools}
self.conversation_history = []
async def run(self, user_input: str, max_iterations: int = 10) -> str:
"""Run agent with tool use"""
self.conversation_history.append({
"role": "user",
"content": user_input
})
for i in range(max_iterations):
# Get LLM response with function calling
response = await self.llm.function_calling(
messages=self.conversation_history,
tools=[tool.to_openai_function() for tool in self.tools.values()]
)
if response["type"] == "message":
# Agent is done
return response["content"]
# Execute tool
tool_name = response["function"]
arguments = json.loads(response["arguments"])
tool_result = await self.execute_tool(tool_name, arguments)
# Add tool result to conversation
self.conversation_history.append({
"role": "function",
"name": tool_name,
"content": str(tool_result)
})
return "Max iterations reached"
async def execute_tool(self, tool_name: str, arguments: Dict) -> any:
"""Execute a tool"""
if tool_name not in self.tools:
raise ValueError(f"Tool {tool_name} not found")
tool = self.tools[tool_name]
return await tool.function(**arguments)Production Deployment
生产部署
python
from fastapi import FastAPI, HTTPException, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from circuitbreaker import circuit
import asyncio
app = FastAPI()
class ChatRequest(BaseModel):
messages: List[Dict]
model: str = "gpt-4-turbo"
stream: bool = False
class RateLimiter:
"""Rate limiter for API"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {}
async def check_limit(self, user_id: str) -> bool:
"""Check if user is within rate limit"""
import time
now = time.time()
if user_id not in self.requests:
self.requests[user_id] = []
# Remove old requests
self.requests[user_id] = [
req_time for req_time in self.requests[user_id]
if now - req_time < self.window_seconds
]
if len(self.requests[user_id]) >= self.max_requests:
return False
self.requests[user_id].append(now)
return True
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
llm_client = LLMClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def call_llm(messages: List[Dict]) -> str:
"""LLM call with circuit breaker"""
return await llm_client.chat_completion(messages)
@app.post("/chat")
async def chat(request: ChatRequest, user_id: str = Depends(get_user_id)):
"""Chat endpoint with rate limiting"""
# Check rate limit
if not await rate_limiter.check_limit(user_id):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
try:
if request.stream:
async def generate():
async for chunk in llm_client.chat_completion_streaming(request.messages):
yield chunk
return StreamingResponse(generate(), media_type="text/event-stream")
else:
response = await call_llm(request.messages)
return {"response": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))python
from fastapi import FastAPI, HTTPException, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from circuitbreaker import circuit
import asyncio
app = FastAPI()
class ChatRequest(BaseModel):
messages: List[Dict]
model: str = "gpt-4-turbo"
stream: bool = False
class RateLimiter:
"""Rate limiter for API"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {}
async def check_limit(self, user_id: str) -> bool:
"""Check if user is within rate limit"""
import time
now = time.time()
if user_id not in self.requests:
self.requests[user_id] = []
# Remove old requests
self.requests[user_id] = [
req_time for req_time in self.requests[user_id]
if now - req_time < self.window_seconds
]
if len(self.requests[user_id]) >= self.max_requests:
return False
self.requests[user_id].append(now)
return True
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
llm_client = LLMClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def call_llm(messages: List[Dict]) -> str:
"""LLM call with circuit breaker"""
return await llm_client.chat_completion(messages)
@app.post("/chat")
async def chat(request: ChatRequest, user_id: str = Depends(get_user_id)):
"""Chat endpoint with rate limiting"""
# Check rate limit
if not await rate_limiter.check_limit(user_id):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
try:
if request.stream:
async def generate():
async for chunk in llm_client.chat_completion_streaming(request.messages):
yield chunk
return StreamingResponse(generate(), media_type="text/event-stream")
else:
response = await call_llm(request.messages)
return {"response": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))Best Practices
最佳实践
LLM Integration
LLM集成
- Implement fallback providers
- Use streaming for better UX
- Cache responses where appropriate
- Handle rate limits gracefully
- Monitor token usage and costs
- Version prompts and track changes
- 配置备用服务提供商
- 采用流式响应优化用户体验
- 合适场景下缓存响应结果
- 优雅处理限流情况
- 监控Token用量与成本
- 对提示词进行版本管理并追踪变更
Production Systems
生产系统
- Implement circuit breakers
- Add comprehensive logging
- Monitor latency and errors
- Use rate limiting
- Implement retry logic with backoff
- Test edge cases thoroughly
- 实现熔断机制
- 添加完善的日志
- 监控延迟与错误率
- 使用限流策略
- 实现带退避的重试逻辑
- 充分测试边缘场景
Security
安全
- Validate and sanitize inputs
- Implement authentication/authorization
- Never expose API keys in logs
- Use environment variables for secrets
- Implement content filtering
- Monitor for prompt injection
- 对输入进行验证与清理
- 实现身份认证/授权
- 禁止在日志中暴露API密钥
- 使用环境变量存储敏感信息
- 实现内容过滤
- 监控提示词注入风险
Anti-Patterns
反模式
❌ No error handling or fallbacks
❌ Exposing raw LLM outputs without validation
❌ No rate limiting or cost controls
❌ Storing API keys in code
❌ No monitoring or logging
❌ Ignoring token limits
❌ No testing of prompts
❌ 无错误处理或降级方案
❌ 未经过验证直接暴露LLM原始输出
❌ 无限流或成本控制措施
❌ 在代码中存储API密钥
❌ 无监控或日志采集
❌ 忽略Token限制
❌ 不对提示词进行测试
Resources
资源
- OpenAI API: https://platform.openai.com/docs
- Anthropic Claude: https://docs.anthropic.com/
- LangChain: https://python.langchain.com/
- LlamaIndex: https://www.llamaindex.ai/
- Weights & Biases Prompts: https://wandb.ai/site/prompts
- OpenAI API: https://platform.openai.com/docs
- Anthropic Claude: https://docs.anthropic.com/
- LangChain: https://python.langchain.com/
- LlamaIndex: https://www.llamaindex.ai/
- Weights & Biases Prompts: https://wandb.ai/site/prompts