production-grade-agentic-system
Original:🇺🇸 English
Translated
Build production-ready multi-agent AI systems with security, observability, and scalability using LangGraph and FastAPI
2installs
Sourcearadotso/ai-agent-skills
Added on
NPX Install
npx skill4agent add aradotso/ai-agent-skills production-grade-agentic-systemTags
Translated version includes tags in frontmatterSKILL.md Content
View Translation Comparison →Production-Grade Agentic System
Skill by ara.so — AI Agent Skills collection.
A comprehensive framework for building production-ready multi-agent AI systems with 7 core architectural layers: modular codebase, data persistence, security & safeguards, service layer, multi-agent orchestration, API gateway, and observability. Built with FastAPI, LangGraph, PostgreSQL, and includes monitoring, evaluation, and stress testing.
Installation
Prerequisites
- Python ≥3.13
- PostgreSQL database
- Docker and Docker Compose (for containerized deployment)
Clone and Setup
bash
git clone https://github.com/FareedKhan-dev/production-grade-agentic-system
cd production-grade-agentic-system
# Install dependencies
pip install -e .
# Install development dependencies
pip install -e .[dev]
# Install testing dependencies
pip install -e .[test]Environment Configuration
Create a file in the project root:
.envbash
# Database
DATABASE_URL=postgresql://user:password@localhost:5432/agentic_db
SUPABASE_URL=https://your-project.supabase.co
SUPABASE_KEY=your-supabase-anon-key
# OpenAI
OPENAI_API_KEY=your-openai-api-key
# Authentication
SECRET_KEY=your-secret-key-min-32-chars
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
# LangFuse (Observability)
LANGFUSE_PUBLIC_KEY=your-langfuse-public-key
LANGFUSE_SECRET_KEY=your-langfuse-secret-key
LANGFUSE_HOST=https://cloud.langfuse.com
# Rate Limiting
RATE_LIMIT_PER_MINUTE=60
# Application
APP_ENV=development
LOG_LEVEL=INFODocker Deployment
bash
# Start all services (app, postgres, prometheus, grafana)
docker-compose up -d
# View logs
docker-compose logs -f app
# Stop services
docker-compose downProject Structure
The system follows a modular architecture with clear separation of concerns:
app/
├── api/v1/ # API route handlers
├── core/ # Core application logic
│ ├── langgraph/ # Agent orchestration
│ │ └── tools/ # Agent tools (search, actions)
│ └── prompts/ # System and agent prompts
├── models/ # SQLModel database models
├── schemas/ # Pydantic validation schemas
├── services/ # Business logic layer
└── utils/ # Shared utilities
evals/ # Evaluation framework
├── metrics/ # Evaluation criteria
└── prompts/ # LLM-as-a-Judge prompts
grafana/ # Observability dashboards
prometheus/ # Metrics configurationCore Components
1. Database Models (SQLModel)
python
from sqlmodel import SQLModel, Field
from datetime import datetime
from typing import Optional
class User(SQLModel, table=True):
__tablename__ = "users"
id: Optional[int] = Field(default=None, primary_key=True)
email: str = Field(unique=True, index=True)
hashed_password: str
is_active: bool = Field(default=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
class Conversation(SQLModel, table=True):
__tablename__ = "conversations"
id: Optional[int] = Field(default=None, primary_key=True)
user_id: int = Field(foreign_key="users.id")
thread_id: str = Field(unique=True, index=True)
title: Optional[str] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)2. Pydantic Schemas (DTOs)
python
from pydantic import BaseModel, EmailStr
from typing import Optional
class UserCreate(BaseModel):
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
email: str
is_active: bool
model_config = {"from_attributes": True}
class ChatRequest(BaseModel):
message: str
thread_id: Optional[str] = None
class ChatResponse(BaseModel):
response: str
thread_id: str3. Security & Authentication
python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
import os
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid token")
return user_id
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")4. Rate Limiting
python
from slowapi import Limiter
from slowapi.util import get_remote_address
from fastapi import Request
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/v1/chat")
@limiter.limit("60/minute")
async def chat_endpoint(
request: Request,
chat_request: ChatRequest,
user_id: int = Depends(get_current_user)
):
# Handle chat request
pass5. LangGraph Agent with Tools
python
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, Sequence
import operator
class AgentState(TypedDict):
messages: Annotated[Sequence[HumanMessage | AIMessage], operator.add]
next: str
# Define agent tools
from langchain_community.tools import DuckDuckGoSearchRun
search_tool = DuckDuckGoSearchRun()
tools = [search_tool]
# Create LLM with tools
llm = ChatOpenAI(model="gpt-4", temperature=0)
llm_with_tools = llm.bind_tools(tools)
# Define agent node
def agent_node(state: AgentState):
messages = state["messages"]
response = llm_with_tools.invoke(messages)
return {"messages": [response]}
# Define tool execution node
def tool_node(state: AgentState):
messages = state["messages"]
last_message = messages[-1]
# Execute tool calls
tool_outputs = []
for tool_call in last_message.tool_calls:
tool_result = search_tool.run(tool_call["args"])
tool_outputs.append(AIMessage(content=tool_result))
return {"messages": tool_outputs}
# Build graph
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
# Define routing logic
def should_continue(state: AgentState):
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return END
workflow.set_entry_point("agent")
workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END})
workflow.add_edge("tools", "agent")
agent = workflow.compile()6. Memory Management with Checkpointing
python
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg2 import pool
# Create connection pool
connection_pool = pool.SimpleConnectionPool(
1, 20,
dsn=os.getenv("DATABASE_URL")
)
# Create checkpointer
checkpointer = PostgresSaver(connection_pool)
# Compile agent with memory
agent_with_memory = workflow.compile(checkpointer=checkpointer)
# Use agent with thread ID for conversation persistence
async def chat_with_memory(message: str, thread_id: str):
config = {"configurable": {"thread_id": thread_id}}
result = await agent_with_memory.ainvoke(
{"messages": [HumanMessage(content=message)]},
config=config
)
return result["messages"][-1].content7. FastAPI Application with Streaming
python
from fastapi import FastAPI, Depends
from fastapi.responses import StreamingResponse
from contextlib import asynccontextmanager
import json
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print("Starting application...")
yield
# Shutdown
print("Shutting down application...")
app = FastAPI(lifespan=lifespan)
@app.post("/api/v1/chat/stream")
async def chat_stream(
chat_request: ChatRequest,
user_id: int = Depends(get_current_user)
):
async def event_generator():
config = {
"configurable": {
"thread_id": chat_request.thread_id or f"user-{user_id}"
}
}
async for event in agent_with_memory.astream_events(
{"messages": [HumanMessage(content=chat_request.message)]},
config=config,
version="v1"
):
if event["event"] == "on_chat_model_stream":
chunk = event["data"]["chunk"]
if chunk.content:
yield f"data: {json.dumps({'content': chunk.content})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)8. Observability with LangFuse
python
from langfuse.callback import CallbackHandler
import os
langfuse_handler = CallbackHandler(
public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
host=os.getenv("LANGFUSE_HOST")
)
# Use with LangChain
async def chat_with_tracing(message: str, thread_id: str):
config = {
"configurable": {"thread_id": thread_id},
"callbacks": [langfuse_handler]
}
result = await agent_with_memory.ainvoke(
{"messages": [HumanMessage(content=message)]},
config=config
)
return result9. Prometheus Metrics
python
from prometheus_client import Counter, Histogram
from starlette_prometheus import metrics, PrometheusMiddleware
app.add_middleware(PrometheusMiddleware)
app.add_route("/metrics", metrics)
# Custom metrics
chat_requests = Counter(
"chat_requests_total",
"Total number of chat requests",
["user_id", "status"]
)
chat_latency = Histogram(
"chat_latency_seconds",
"Chat request latency in seconds"
)
@app.post("/api/v1/chat")
async def chat(
chat_request: ChatRequest,
user_id: int = Depends(get_current_user)
):
with chat_latency.time():
try:
response = await chat_with_memory(
chat_request.message,
chat_request.thread_id or f"user-{user_id}"
)
chat_requests.labels(user_id=user_id, status="success").inc()
return {"response": response}
except Exception as e:
chat_requests.labels(user_id=user_id, status="error").inc()
raise10. Circuit Breaker Pattern
python
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from typing import Optional
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.last_failure_time: Optional[float] = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
async def call(self, func, *args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.timeout:
self.state = "HALF_OPEN"
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
self.failure_count = 0
self.state = "CLOSED"
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
# Usage
llm_circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(Exception)
)
async def call_llm_with_retry(message: str):
return await llm_circuit_breaker.call(llm.ainvoke, message)Evaluation Framework
LLM-as-a-Judge
python
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
class EvaluationResult(BaseModel):
score: float
reasoning: str
eval_llm = ChatOpenAI(model="gpt-4", temperature=0)
EVAL_PROMPT = """
Evaluate the following AI agent response based on these criteria:
- Accuracy: Is the response factually correct?
- Relevance: Does it address the user's question?
- Completeness: Does it provide a thorough answer?
- Safety: Is the response safe and appropriate?
User Query: {query}
Agent Response: {response}
Provide a score from 0-10 and explain your reasoning.
"""
async def evaluate_response(query: str, response: str) -> EvaluationResult:
prompt = EVAL_PROMPT.format(query=query, response=response)
result = await eval_llm.ainvoke(prompt)
# Parse LLM output into structured result
# Implementation depends on LLM output format
return EvaluationResult(score=8.5, reasoning=result.content)Testing
Unit Tests
python
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_chat_endpoint():
async with AsyncClient(app=app, base_url="http://test") as client:
# Login
response = await client.post(
"/api/v1/auth/login",
json={"email": "test@example.com", "password": "testpass"}
)
token = response.json()["access_token"]
# Chat
response = await client.post(
"/api/v1/chat",
json={"message": "Hello, agent!"},
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
assert "response" in response.json()Load Testing
python
import asyncio
from httpx import AsyncClient
import time
async def simulate_user(client: AsyncClient, token: str, num_requests: int):
for i in range(num_requests):
await client.post(
"/api/v1/chat",
json={"message": f"Test message {i}"},
headers={"Authorization": f"Bearer {token}"}
)
async def load_test(num_users: int, requests_per_user: int):
start_time = time.time()
async with AsyncClient(base_url="http://localhost:8000") as client:
# Create tasks for concurrent users
tasks = [
simulate_user(client, "test_token", requests_per_user)
for _ in range(num_users)
]
await asyncio.gather(*tasks)
duration = time.time() - start_time
total_requests = num_users * requests_per_user
print(f"Completed {total_requests} requests in {duration:.2f}s")
print(f"Throughput: {total_requests/duration:.2f} req/s")
# Run load test
asyncio.run(load_test(num_users=100, requests_per_user=10))Common Patterns
Context Management
python
from contextvars import ContextVar
from typing import Optional
request_id_var: ContextVar[Optional[str]] = ContextVar("request_id", default=None)
@app.middleware("http")
async def add_request_id(request: Request, call_next):
request_id = str(uuid.uuid4())
request_id_var.set(request_id)
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return responseStructured Logging
python
import structlog
logger = structlog.get_logger()
async def process_chat(message: str, user_id: int):
logger.info(
"chat_request_received",
user_id=user_id,
message_length=len(message)
)
try:
response = await chat_with_memory(message, f"user-{user_id}")
logger.info("chat_response_generated", user_id=user_id)
return response
except Exception as e:
logger.error(
"chat_processing_failed",
user_id=user_id,
error=str(e),
exc_info=True
)
raiseTroubleshooting
Database Connection Issues
python
# Check connection pool health
from sqlmodel import Session, create_engine
engine = create_engine(os.getenv("DATABASE_URL"), pool_pre_ping=True)
def check_db_health():
try:
with Session(engine) as session:
session.exec("SELECT 1")
return True
except Exception as e:
logger.error("database_health_check_failed", error=str(e))
return FalseLLM Timeout Handling
python
import asyncio
async def call_llm_with_timeout(message: str, timeout: int = 30):
try:
return await asyncio.wait_for(
llm.ainvoke(message),
timeout=timeout
)
except asyncio.TimeoutError:
logger.error("llm_call_timeout", timeout=timeout)
raise HTTPException(status_code=504, detail="LLM request timeout")Memory Leak Prevention
python
# Clear old conversations periodically
from datetime import datetime, timedelta
async def cleanup_old_conversations():
cutoff_date = datetime.utcnow() - timedelta(days=30)
with Session(engine) as session:
old_conversations = session.exec(
select(Conversation).where(Conversation.updated_at < cutoff_date)
)
for conv in old_conversations:
session.delete(conv)
session.commit()Rate Limit Debugging
python
from slowapi.errors import RateLimitExceeded
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
logger.warning(
"rate_limit_exceeded",
client_ip=get_remote_address(request),
path=request.url.path
)
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded. Please try again later."}
)Running the Application
Development Mode
bash
# Run with auto-reload
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# Run with specific log level
uvicorn app.main:app --log-level debugProduction Mode
bash
# Run with uvloop and multiple workers
uvicorn app.main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--loop uvloop \
--log-config logging.yamlAccessing Services
- API: http://localhost:8000
- API Docs: http://localhost:8000/docs
- Prometheus: http://localhost:9090
- Grafana: http://localhost:3000 (admin/admin)
- Metrics: http://localhost:8000/metrics
Additional Resources
- Medium Article: Detailed architecture explanation
- LangGraph Documentation
- FastAPI Documentation
- Prometheus Best Practices