production-grade-agentic-system

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Production-Grade Agentic System

生产级Agent系统

Skill by ara.so — AI Agent Skills collection.
A comprehensive framework for building production-ready multi-agent AI systems with 7 core architectural layers: modular codebase, data persistence, security & safeguards, service layer, multi-agent orchestration, API gateway, and observability. Built with FastAPI, LangGraph, PostgreSQL, and includes monitoring, evaluation, and stress testing.
ara.so提供的Skill — AI Agent技能集合。
这是一个用于构建生产级多Agent AI系统的综合框架,包含7个核心架构层:模块化代码库、数据持久化、安全与防护、服务层、多Agent编排、API网关和可观测性。基于FastAPI、LangGraph、PostgreSQL构建,并包含监控、评估和压力测试功能。

Installation

安装

Prerequisites

前置要求

  • Python ≥3.13
  • PostgreSQL database
  • Docker and Docker Compose (for containerized deployment)
  • Python ≥3.13
  • PostgreSQL数据库
  • Docker和Docker Compose(用于容器化部署)

Clone and Setup

克隆与设置

bash
git clone https://github.com/FareedKhan-dev/production-grade-agentic-system
cd production-grade-agentic-system
bash
git clone https://github.com/FareedKhan-dev/production-grade-agentic-system
cd production-grade-agentic-system

Install dependencies

Install dependencies

pip install -e .
pip install -e .

Install development dependencies

Install development dependencies

pip install -e .[dev]
pip install -e .[dev]

Install testing dependencies

Install testing dependencies

pip install -e .[test]
undefined
pip install -e .[test]
undefined

Environment Configuration

环境配置

Create a
.env
file in the project root:
bash
undefined
在项目根目录创建
.env
文件:
bash
undefined

Database

Database

DATABASE_URL=postgresql://user:password@localhost:5432/agentic_db SUPABASE_URL=https://your-project.supabase.co SUPABASE_KEY=your-supabase-anon-key
DATABASE_URL=postgresql://user:password@localhost:5432/agentic_db SUPABASE_URL=https://your-project.supabase.co SUPABASE_KEY=your-supabase-anon-key

OpenAI

OpenAI

OPENAI_API_KEY=your-openai-api-key
OPENAI_API_KEY=your-openai-api-key

Authentication

Authentication

SECRET_KEY=your-secret-key-min-32-chars ALGORITHM=HS256 ACCESS_TOKEN_EXPIRE_MINUTES=30
SECRET_KEY=your-secret-key-min-32-chars ALGORITHM=HS256 ACCESS_TOKEN_EXPIRE_MINUTES=30

LangFuse (Observability)

LangFuse (Observability)

LANGFUSE_PUBLIC_KEY=your-langfuse-public-key LANGFUSE_SECRET_KEY=your-langfuse-secret-key LANGFUSE_HOST=https://cloud.langfuse.com
LANGFUSE_PUBLIC_KEY=your-langfuse-public-key LANGFUSE_SECRET_KEY=your-langfuse-secret-key LANGFUSE_HOST=https://cloud.langfuse.com

Rate Limiting

Rate Limiting

RATE_LIMIT_PER_MINUTE=60
RATE_LIMIT_PER_MINUTE=60

Application

Application

APP_ENV=development LOG_LEVEL=INFO
undefined
APP_ENV=development LOG_LEVEL=INFO
undefined

Docker Deployment

Docker部署

bash
undefined
bash
undefined

Start all services (app, postgres, prometheus, grafana)

Start all services (app, postgres, prometheus, grafana)

docker-compose up -d
docker-compose up -d

View logs

View logs

docker-compose logs -f app
docker-compose logs -f app

Stop services

Stop services

docker-compose down
undefined
docker-compose down
undefined

Project Structure

项目结构

The system follows a modular architecture with clear separation of concerns:
app/
├── api/v1/          # API route handlers
├── core/            # Core application logic
│   ├── langgraph/   # Agent orchestration
│   │   └── tools/   # Agent tools (search, actions)
│   └── prompts/     # System and agent prompts
├── models/          # SQLModel database models
├── schemas/         # Pydantic validation schemas
├── services/        # Business logic layer
└── utils/           # Shared utilities

evals/               # Evaluation framework
├── metrics/         # Evaluation criteria
└── prompts/         # LLM-as-a-Judge prompts

grafana/             # Observability dashboards
prometheus/          # Metrics configuration
该系统遵循模块化架构,关注点清晰分离:
app/
├── api/v1/          # API路由处理器
├── core/            # 核心应用逻辑
│   ├── langgraph/   # Agent编排
│   │   └── tools/   # Agent工具(搜索、操作)
│   └── prompts/     # 系统与Agent提示词
├── models/          # SQLModel数据库模型
├── schemas/         # Pydantic验证模式
├── services/        # 业务逻辑层
└── utils/           # 共享工具类

evals/               # 评估框架
├── metrics/         # 评估标准
└── prompts/         # LLM-as-a-Judge提示词

grafana/             # 可观测性仪表盘
prometheus/          # 指标配置

Core Components

核心组件

1. Database Models (SQLModel)

1. 数据库模型(SQLModel)

python
from sqlmodel import SQLModel, Field
from datetime import datetime
from typing import Optional

class User(SQLModel, table=True):
    __tablename__ = "users"
    
    id: Optional[int] = Field(default=None, primary_key=True)
    email: str = Field(unique=True, index=True)
    hashed_password: str
    is_active: bool = Field(default=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)
    
class Conversation(SQLModel, table=True):
    __tablename__ = "conversations"
    
    id: Optional[int] = Field(default=None, primary_key=True)
    user_id: int = Field(foreign_key="users.id")
    thread_id: str = Field(unique=True, index=True)
    title: Optional[str] = None
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)
python
from sqlmodel import SQLModel, Field
from datetime import datetime
from typing import Optional

class User(SQLModel, table=True):
    __tablename__ = "users"
    
    id: Optional[int] = Field(default=None, primary_key=True)
    email: str = Field(unique=True, index=True)
    hashed_password: str
    is_active: bool = Field(default=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)
    
class Conversation(SQLModel, table=True):
    __tablename__ = "conversations"
    
    id: Optional[int] = Field(default=None, primary_key=True)
    user_id: int = Field(foreign_key="users.id")
    thread_id: str = Field(unique=True, index=True)
    title: Optional[str] = None
    created_at: datetime = Field(default_factory=datetime.utcnow)
    updated_at: datetime = Field(default_factory=datetime.utcnow)

2. Pydantic Schemas (DTOs)

2. Pydantic模式(DTOs)

python
from pydantic import BaseModel, EmailStr
from typing import Optional

class UserCreate(BaseModel):
    email: EmailStr
    password: str

class UserResponse(BaseModel):
    id: int
    email: str
    is_active: bool
    
    model_config = {"from_attributes": True}

class ChatRequest(BaseModel):
    message: str
    thread_id: Optional[str] = None

class ChatResponse(BaseModel):
    response: str
    thread_id: str
python
from pydantic import BaseModel, EmailStr
from typing import Optional

class UserCreate(BaseModel):
    email: EmailStr
    password: str

class UserResponse(BaseModel):
    id: int
    email: str
    is_active: bool
    
    model_config = {"from_attributes": True}

class ChatRequest(BaseModel):
    message: str
    thread_id: Optional[str] = None

class ChatResponse(BaseModel):
    response: str
    thread_id: str

3. Security & Authentication

3. 安全与认证

python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
import os

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()

SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = os.getenv("ALGORITHM", "HS256")

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: int = payload.get("sub")
        if user_id is None:
            raise HTTPException(status_code=401, detail="Invalid token")
        return user_id
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")
python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
import os

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()

SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = os.getenv("ALGORITHM", "HS256")

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: int = payload.get("sub")
        if user_id is None:
            raise HTTPException(status_code=401, detail="Invalid token")
        return user_id
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

4. Rate Limiting

4. 速率限制

python
from slowapi import Limiter
from slowapi.util import get_remote_address
from fastapi import Request

limiter = Limiter(key_func=get_remote_address)

@app.post("/api/v1/chat")
@limiter.limit("60/minute")
async def chat_endpoint(
    request: Request,
    chat_request: ChatRequest,
    user_id: int = Depends(get_current_user)
):
    # Handle chat request
    pass
python
from slowapi import Limiter
from slowapi.util import get_remote_address
from fastapi import Request

limiter = Limiter(key_func=get_remote_address)

@app.post("/api/v1/chat")
@limiter.limit("60/minute")
async def chat_endpoint(
    request: Request,
    chat_request: ChatRequest,
    user_id: int = Depends(get_current_user)
):
    # Handle chat request
    pass

5. LangGraph Agent with Tools

5. 带工具的LangGraph Agent

python
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Sequence
import operator

class AgentState(TypedDict):
    messages: Annotated[Sequence[HumanMessage | AIMessage], operator.add]
    next: str
python
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Sequence
import operator

class AgentState(TypedDict):
    messages: Annotated[Sequence[HumanMessage | AIMessage], operator.add]
    next: str

Define agent tools

Define agent tools

from langchain_community.tools import DuckDuckGoSearchRun
search_tool = DuckDuckGoSearchRun()
tools = [search_tool]
from langchain_community.tools import DuckDuckGoSearchRun
search_tool = DuckDuckGoSearchRun()
tools = [search_tool]

Create LLM with tools

Create LLM with tools

llm = ChatOpenAI(model="gpt-4", temperature=0) llm_with_tools = llm.bind_tools(tools)
llm = ChatOpenAI(model="gpt-4", temperature=0) llm_with_tools = llm.bind_tools(tools)

Define agent node

Define agent node

def agent_node(state: AgentState): messages = state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]}
def agent_node(state: AgentState): messages = state["messages"] response = llm_with_tools.invoke(messages) return {"messages": [response]}

Define tool execution node

Define tool execution node

def tool_node(state: AgentState): messages = state["messages"] last_message = messages[-1]
# Execute tool calls
tool_outputs = []
for tool_call in last_message.tool_calls:
    tool_result = search_tool.run(tool_call["args"])
    tool_outputs.append(AIMessage(content=tool_result))

return {"messages": tool_outputs}
def tool_node(state: AgentState): messages = state["messages"] last_message = messages[-1]
# Execute tool calls
tool_outputs = []
for tool_call in last_message.tool_calls:
    tool_result = search_tool.run(tool_call["args"])
    tool_outputs.append(AIMessage(content=tool_result))

return {"messages": tool_outputs}

Build graph

Build graph

workflow = StateGraph(AgentState) workflow.add_node("agent", agent_node) workflow.add_node("tools", tool_node)
workflow = StateGraph(AgentState) workflow.add_node("agent", agent_node) workflow.add_node("tools", tool_node)

Define routing logic

Define routing logic

def should_continue(state: AgentState): last_message = state["messages"][-1] if hasattr(last_message, "tool_calls") and last_message.tool_calls: return "tools" return END
workflow.set_entry_point("agent") workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END}) workflow.add_edge("tools", "agent")
agent = workflow.compile()
undefined
def should_continue(state: AgentState): last_message = state["messages"][-1] if hasattr(last_message, "tool_calls") and last_message.tool_calls: return "tools" return END
workflow.set_entry_point("agent") workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END}) workflow.add_edge("tools", "agent")
agent = workflow.compile()
undefined

6. Memory Management with Checkpointing

6. 带检查点的内存管理

python
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg2 import pool
python
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg2 import pool

Create connection pool

Create connection pool

connection_pool = pool.SimpleConnectionPool( 1, 20, dsn=os.getenv("DATABASE_URL") )
connection_pool = pool.SimpleConnectionPool( 1, 20, dsn=os.getenv("DATABASE_URL") )

Create checkpointer

Create checkpointer

checkpointer = PostgresSaver(connection_pool)
checkpointer = PostgresSaver(connection_pool)

Compile agent with memory

Compile agent with memory

agent_with_memory = workflow.compile(checkpointer=checkpointer)
agent_with_memory = workflow.compile(checkpointer=checkpointer)

Use agent with thread ID for conversation persistence

Use agent with thread ID for conversation persistence

async def chat_with_memory(message: str, thread_id: str): config = {"configurable": {"thread_id": thread_id}}
result = await agent_with_memory.ainvoke(
    {"messages": [HumanMessage(content=message)]},
    config=config
)

return result["messages"][-1].content
undefined
async def chat_with_memory(message: str, thread_id: str): config = {"configurable": {"thread_id": thread_id}}
result = await agent_with_memory.ainvoke(
    {"messages": [HumanMessage(content=message)]},
    config=config
)

return result["messages"][-1].content
undefined

7. FastAPI Application with Streaming

7. 带流式响应的FastAPI应用

python
from fastapi import FastAPI, Depends
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
import json

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    print("Starting application...")
    yield
    # Shutdown
    print("Shutting down application...")

app = FastAPI(lifespan=lifespan)

@app.post("/api/v1/chat/stream")
async def chat_stream(
    chat_request: ChatRequest,
    user_id: int = Depends(get_current_user)
):
    async def event_generator():
        config = {
            "configurable": {
                "thread_id": chat_request.thread_id or f"user-{user_id}"
            }
        }
        
        async for event in agent_with_memory.astream_events(
            {"messages": [HumanMessage(content=chat_request.message)]},
            config=config,
            version="v1"
        ):
            if event["event"] == "on_chat_model_stream":
                chunk = event["data"]["chunk"]
                if chunk.content:
                    yield f"data: {json.dumps({'content': chunk.content})}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )
python
from fastapi import FastAPI, Depends
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
import json

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    print("Starting application...")
    yield
    # Shutdown
    print("Shutting down application...")

app = FastAPI(lifespan=lifespan)

@app.post("/api/v1/chat/stream")
async def chat_stream(
    chat_request: ChatRequest,
    user_id: int = Depends(get_current_user)
):
    async def event_generator():
        config = {
            "configurable": {
                "thread_id": chat_request.thread_id or f"user-{user_id}"
            }
        }
        
        async for event in agent_with_memory.astream_events(
            {"messages": [HumanMessage(content=chat_request.message)]},
            config=config,
            version="v1"
        ):
            if event["event"] == "on_chat_model_stream":
                chunk = event["data"]["chunk"]
                if chunk.content:
                    yield f"data: {json.dumps({'content': chunk.content})}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

8. Observability with LangFuse

8. 基于LangFuse的可观测性

python
from langfuse.callback import CallbackHandler
import os

langfuse_handler = CallbackHandler(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
    host=os.getenv("LANGFUSE_HOST")
)
python
from langfuse.callback import CallbackHandler
import os

langfuse_handler = CallbackHandler(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
    host=os.getenv("LANGFUSE_HOST")
)

Use with LangChain

Use with LangChain

async def chat_with_tracing(message: str, thread_id: str): config = { "configurable": {"thread_id": thread_id}, "callbacks": [langfuse_handler] }
result = await agent_with_memory.ainvoke(
    {"messages": [HumanMessage(content=message)]},
    config=config
)

return result
undefined
async def chat_with_tracing(message: str, thread_id: str): config = { "configurable": {"thread_id": thread_id}, "callbacks": [langfuse_handler] }
result = await agent_with_memory.ainvoke(
    {"messages": [HumanMessage(content=message)]},
    config=config
)

return result
undefined

9. Prometheus Metrics

9. Prometheus指标

python
from prometheus_client import Counter, Histogram
from starlette_prometheus import metrics, PrometheusMiddleware

app.add_middleware(PrometheusMiddleware)
app.add_route("/metrics", metrics)
python
from prometheus_client import Counter, Histogram
from starlette_prometheus import metrics, PrometheusMiddleware

app.add_middleware(PrometheusMiddleware)
app.add_route("/metrics", metrics)

Custom metrics

Custom metrics

chat_requests = Counter( "chat_requests_total", "Total number of chat requests", ["user_id", "status"] )
chat_latency = Histogram( "chat_latency_seconds", "Chat request latency in seconds" )
@app.post("/api/v1/chat") async def chat( chat_request: ChatRequest, user_id: int = Depends(get_current_user) ): with chat_latency.time(): try: response = await chat_with_memory( chat_request.message, chat_request.thread_id or f"user-{user_id}" ) chat_requests.labels(user_id=user_id, status="success").inc() return {"response": response} except Exception as e: chat_requests.labels(user_id=user_id, status="error").inc() raise
undefined
chat_requests = Counter( "chat_requests_total", "Total number of chat requests", ["user_id", "status"] )
chat_latency = Histogram( "chat_latency_seconds", "Chat request latency in seconds" )
@app.post("/api/v1/chat") async def chat( chat_request: ChatRequest, user_id: int = Depends(get_current_user) ): with chat_latency.time(): try: response = await chat_with_memory( chat_request.message, chat_request.thread_id or f"user-{user_id}" ) chat_requests.labels(user_id=user_id, status="success").inc() return {"response": response} except Exception as e: chat_requests.labels(user_id=user_id, status="error").inc() raise
undefined

10. Circuit Breaker Pattern

10. 断路器模式

python
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from typing import Optional

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout: int = 60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure_time: Optional[float] = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    async def call(self, func, *args, **kwargs):
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "HALF_OPEN"
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise
    
    def on_success(self):
        self.failure_count = 0
        self.state = "CLOSED"
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"
python
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from typing import Optional

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout: int = 60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure_time: Optional[float] = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    async def call(self, func, *args, **kwargs):
        if self.state == "OPEN":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "HALF_OPEN"
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise
    
    def on_success(self):
        self.failure_count = 0
        self.state = "CLOSED"
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"

Usage

Usage

llm_circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type(Exception) ) async def call_llm_with_retry(message: str): return await llm_circuit_breaker.call(llm.ainvoke, message)
undefined
llm_circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)
@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_if_exception_type(Exception) ) async def call_llm_with_retry(message: str): return await llm_circuit_breaker.call(llm.ainvoke, message)
undefined

Evaluation Framework

评估框架

LLM-as-a-Judge

LLM-as-a-Judge

python
from langchain_openai import ChatOpenAI
from pydantic import BaseModel

class EvaluationResult(BaseModel):
    score: float
    reasoning: str

eval_llm = ChatOpenAI(model="gpt-4", temperature=0)

EVAL_PROMPT = """
Evaluate the following AI agent response based on these criteria:
- Accuracy: Is the response factually correct?
- Relevance: Does it address the user's question?
- Completeness: Does it provide a thorough answer?
- Safety: Is the response safe and appropriate?

User Query: {query}
Agent Response: {response}

Provide a score from 0-10 and explain your reasoning.
"""

async def evaluate_response(query: str, response: str) -> EvaluationResult:
    prompt = EVAL_PROMPT.format(query=query, response=response)
    result = await eval_llm.ainvoke(prompt)
    
    # Parse LLM output into structured result
    # Implementation depends on LLM output format
    return EvaluationResult(score=8.5, reasoning=result.content)
python
from langchain_openai import ChatOpenAI
from pydantic import BaseModel

class EvaluationResult(BaseModel):
    score: float
    reasoning: str

eval_llm = ChatOpenAI(model="gpt-4", temperature=0)

EVAL_PROMPT = """
Evaluate the following AI agent response based on these criteria:
- Accuracy: Is the response factually correct?
- Relevance: Does it address the user's question?
- Completeness: Does it provide a thorough answer?
- Safety: Is the response safe and appropriate?

User Query: {query}
Agent Response: {response}

Provide a score from 0-10 and explain your reasoning.
"""

async def evaluate_response(query: str, response: str) -> EvaluationResult:
    prompt = EVAL_PROMPT.format(query=query, response=response)
    result = await eval_llm.ainvoke(prompt)
    
    # Parse LLM output into structured result
    # Implementation depends on LLM output format
    return EvaluationResult(score=8.5, reasoning=result.content)

Testing

测试

Unit Tests

单元测试

python
import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_chat_endpoint():
    async with AsyncClient(app=app, base_url="http://test") as client:
        # Login
        response = await client.post(
            "/api/v1/auth/login",
            json={"email": "test@example.com", "password": "testpass"}
        )
        token = response.json()["access_token"]
        
        # Chat
        response = await client.post(
            "/api/v1/chat",
            json={"message": "Hello, agent!"},
            headers={"Authorization": f"Bearer {token}"}
        )
        
        assert response.status_code == 200
        assert "response" in response.json()
python
import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_chat_endpoint():
    async with AsyncClient(app=app, base_url="http://test") as client:
        # Login
        response = await client.post(
            "/api/v1/auth/login",
            json={"email": "test@example.com", "password": "testpass"}
        )
        token = response.json()["access_token"]
        
        # Chat
        response = await client.post(
            "/api/v1/chat",
            json={"message": "Hello, agent!"},
            headers={"Authorization": f"Bearer {token}"}
        )
        
        assert response.status_code == 200
        assert "response" in response.json()

Load Testing

负载测试

python
import asyncio
from httpx import AsyncClient
import time

async def simulate_user(client: AsyncClient, token: str, num_requests: int):
    for i in range(num_requests):
        await client.post(
            "/api/v1/chat",
            json={"message": f"Test message {i}"},
            headers={"Authorization": f"Bearer {token}"}
        )

async def load_test(num_users: int, requests_per_user: int):
    start_time = time.time()
    
    async with AsyncClient(base_url="http://localhost:8000") as client:
        # Create tasks for concurrent users
        tasks = [
            simulate_user(client, "test_token", requests_per_user)
            for _ in range(num_users)
        ]
        await asyncio.gather(*tasks)
    
    duration = time.time() - start_time
    total_requests = num_users * requests_per_user
    print(f"Completed {total_requests} requests in {duration:.2f}s")
    print(f"Throughput: {total_requests/duration:.2f} req/s")
python
import asyncio
from httpx import AsyncClient
import time

async def simulate_user(client: AsyncClient, token: str, num_requests: int):
    for i in range(num_requests):
        await client.post(
            "/api/v1/chat",
            json={"message": f"Test message {i}"},
            headers={"Authorization": f"Bearer {token}"}
        )

async def load_test(num_users: int, requests_per_user: int):
    start_time = time.time()
    
    async with AsyncClient(base_url="http://localhost:8000") as client:
        # Create tasks for concurrent users
        tasks = [
            simulate_user(client, "test_token", requests_per_user)
            for _ in range(num_users)
        ]
        await asyncio.gather(*tasks)
    
    duration = time.time() - start_time
    total_requests = num_users * requests_per_user
    print(f"Completed {total_requests} requests in {duration:.2f}s")
    print(f"Throughput: {total_requests/duration:.2f} req/s")

Run load test

Run load test

asyncio.run(load_test(num_users=100, requests_per_user=10))
undefined
asyncio.run(load_test(num_users=100, requests_per_user=10))
undefined

Common Patterns

常见模式

Context Management

上下文管理

python
from contextvars import ContextVar
from typing import Optional

request_id_var: ContextVar[Optional[str]] = ContextVar("request_id", default=None)

@app.middleware("http")
async def add_request_id(request: Request, call_next):
    request_id = str(uuid.uuid4())
    request_id_var.set(request_id)
    
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    return response
python
from contextvars import ContextVar
from typing import Optional

request_id_var: ContextVar[Optional[str]] = ContextVar("request_id", default=None)

@app.middleware("http")
async def add_request_id(request: Request, call_next):
    request_id = str(uuid.uuid4())
    request_id_var.set(request_id)
    
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    return response

Structured Logging

结构化日志

python
import structlog

logger = structlog.get_logger()

async def process_chat(message: str, user_id: int):
    logger.info(
        "chat_request_received",
        user_id=user_id,
        message_length=len(message)
    )
    
    try:
        response = await chat_with_memory(message, f"user-{user_id}")
        logger.info("chat_response_generated", user_id=user_id)
        return response
    except Exception as e:
        logger.error(
            "chat_processing_failed",
            user_id=user_id,
            error=str(e),
            exc_info=True
        )
        raise
python
import structlog

logger = structlog.get_logger()

async def process_chat(message: str, user_id: int):
    logger.info(
        "chat_request_received",
        user_id=user_id,
        message_length=len(message)
    )
    
    try:
        response = await chat_with_memory(message, f"user-{user_id}")
        logger.info("chat_response_generated", user_id=user_id)
        return response
    except Exception as e:
        logger.error(
            "chat_processing_failed",
            user_id=user_id,
            error=str(e),
            exc_info=True
        )
        raise

Troubleshooting

故障排查

Database Connection Issues

数据库连接问题

python
undefined
python
undefined

Check connection pool health

Check connection pool health

from sqlmodel import Session, create_engine
engine = create_engine(os.getenv("DATABASE_URL"), pool_pre_ping=True)
def check_db_health(): try: with Session(engine) as session: session.exec("SELECT 1") return True except Exception as e: logger.error("database_health_check_failed", error=str(e)) return False
undefined
from sqlmodel import Session, create_engine
engine = create_engine(os.getenv("DATABASE_URL"), pool_pre_ping=True)
def check_db_health(): try: with Session(engine) as session: session.exec("SELECT 1") return True except Exception as e: logger.error("database_health_check_failed", error=str(e)) return False
undefined

LLM Timeout Handling

LLM超时处理

python
import asyncio

async def call_llm_with_timeout(message: str, timeout: int = 30):
    try:
        return await asyncio.wait_for(
            llm.ainvoke(message),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        logger.error("llm_call_timeout", timeout=timeout)
        raise HTTPException(status_code=504, detail="LLM request timeout")
python
import asyncio

async def call_llm_with_timeout(message: str, timeout: int = 30):
    try:
        return await asyncio.wait_for(
            llm.ainvoke(message),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        logger.error("llm_call_timeout", timeout=timeout)
        raise HTTPException(status_code=504, detail="LLM request timeout")

Memory Leak Prevention

内存泄漏预防

python
undefined
python
undefined

Clear old conversations periodically

Clear old conversations periodically

from datetime import datetime, timedelta
async def cleanup_old_conversations(): cutoff_date = datetime.utcnow() - timedelta(days=30)
with Session(engine) as session:
    old_conversations = session.exec(
        select(Conversation).where(Conversation.updated_at < cutoff_date)
    )
    
    for conv in old_conversations:
        session.delete(conv)
    
    session.commit()
undefined
from datetime import datetime, timedelta
async def cleanup_old_conversations(): cutoff_date = datetime.utcnow() - timedelta(days=30)
with Session(engine) as session:
    old_conversations = session.exec(
        select(Conversation).where(Conversation.updated_at < cutoff_date)
    )
    
    for conv in old_conversations:
        session.delete(conv)
    
    session.commit()
undefined

Rate Limit Debugging

速率限制调试

python
from slowapi.errors import RateLimitExceeded

@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
    logger.warning(
        "rate_limit_exceeded",
        client_ip=get_remote_address(request),
        path=request.url.path
    )
    return JSONResponse(
        status_code=429,
        content={"detail": "Rate limit exceeded. Please try again later."}
    )
python
from slowapi.errors import RateLimitExceeded

@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
    logger.warning(
        "rate_limit_exceeded",
        client_ip=get_remote_address(request),
        path=request.url.path
    )
    return JSONResponse(
        status_code=429,
        content={"detail": "Rate limit exceeded. Please try again later."}
    )

Running the Application

运行应用

Development Mode

开发模式

bash
undefined
bash
undefined

Run with auto-reload

Run with auto-reload

uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

Run with specific log level

Run with specific log level

uvicorn app.main:app --log-level debug
undefined
uvicorn app.main:app --log-level debug
undefined

Production Mode

生产模式

bash
undefined
bash
undefined

Run with uvloop and multiple workers

Run with uvloop and multiple workers

uvicorn app.main:app
--host 0.0.0.0
--port 8000
--workers 4
--loop uvloop
--log-config logging.yaml
undefined
uvicorn app.main:app
--host 0.0.0.0
--port 8000
--workers 4
--loop uvloop
--log-config logging.yaml
undefined

Accessing Services

访问服务

Additional Resources

额外资源