fastapi-templates

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

FastAPI Project Templates

FastAPI项目模板

Production-ready FastAPI project structures with async patterns, dependency injection, middleware, and best practices for building high-performance APIs.
具备异步模式、依赖注入、中间件的生产级FastAPI项目结构,以及构建高性能API的最佳实践。

When to Use This Skill

何时使用此技能

  • Starting new FastAPI projects from scratch
  • Implementing async REST APIs with Python
  • Building high-performance web services and microservices
  • Creating async applications with PostgreSQL, MongoDB
  • Setting up API projects with proper structure and testing
  • 从零开始创建新的FastAPI项目
  • 使用Python实现异步REST API
  • 构建高性能Web服务和微服务
  • 创建基于PostgreSQL、MongoDB的异步应用
  • 搭建具备规范结构和测试体系的API项目

Core Concepts

核心概念

1. Project Structure

1. 项目结构

Recommended Layout:
app/
├── api/                    # API routes
│   ├── v1/
│   │   ├── endpoints/
│   │   │   ├── users.py
│   │   │   ├── auth.py
│   │   │   └── items.py
│   │   └── router.py
│   └── dependencies.py     # Shared dependencies
├── core/                   # Core configuration
│   ├── config.py
│   ├── security.py
│   └── database.py
├── models/                 # Database models
│   ├── user.py
│   └── item.py
├── schemas/                # Pydantic schemas
│   ├── user.py
│   └── item.py
├── services/               # Business logic
│   ├── user_service.py
│   └── auth_service.py
├── repositories/           # Data access
│   ├── user_repository.py
│   └── item_repository.py
└── main.py                 # Application entry
推荐布局:
app/
├── api/                    # API路由
│   ├── v1/
│   │   ├── endpoints/
│   │   │   ├── users.py
│   │   │   ├── auth.py
│   │   │   └── items.py
│   │   └── router.py
│   └── dependencies.py     # 共享依赖
├── core/                   # 核心配置
│   ├── config.py
│   ├── security.py
│   └── database.py
├── models/                 # 数据库模型
│   ├── user.py
│   └── item.py
├── schemas/                # Pydantic 模式
│   ├── user.py
│   └── item.py
├── services/               # 业务逻辑
│   ├── user_service.py
│   └── auth_service.py
├── repositories/           # 数据访问层
│   ├── user_repository.py
│   └── item_repository.py
└── main.py                 # 应用入口

2. Dependency Injection

2. 依赖注入

FastAPI's built-in DI system using
Depends
:
  • Database session management
  • Authentication/authorization
  • Shared business logic
  • Configuration injection
FastAPI内置的基于
Depends
的DI系统:
  • 数据库会话管理
  • 身份验证/授权
  • 共享业务逻辑
  • 配置注入

3. Async Patterns

3. 异步模式

Proper async/await usage:
  • Async route handlers
  • Async database operations
  • Async background tasks
  • Async middleware
正确使用async/await的方式:
  • 异步路由处理器
  • 异步数据库操作
  • 异步后台任务
  • 异步中间件

Implementation Patterns

实现模式

Pattern 1: Complete FastAPI Application

模式1:完整FastAPI应用

python
undefined
python
undefined

main.py

main.py

from fastapi import FastAPI, Depends from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager
@asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan events.""" # Startup await database.connect() yield # Shutdown await database.disconnect()
app = FastAPI( title="API Template", version="1.0.0", lifespan=lifespan )
from fastapi import FastAPI, Depends from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager
@asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期事件。""" # 启动 await database.connect() yield # 关闭 await database.disconnect()
app = FastAPI( title="API模板", version="1.0.0", lifespan=lifespan )

CORS middleware

CORS中间件

app.add_middleware( CORSMiddleware, allow_origins=[""], allow_credentials=True, allow_methods=[""], allow_headers=["*"], )
app.add_middleware( CORSMiddleware, allow_origins=[""], allow_credentials=True, allow_methods=[""], allow_headers=["*"], )

Include routers

包含路由

from app.api.v1.router import api_router app.include_router(api_router, prefix="/api/v1")
from app.api.v1.router import api_router app.include_router(api_router, prefix="/api/v1")

core/config.py

core/config.py

from pydantic_settings import BaseSettings from functools import lru_cache
class Settings(BaseSettings): """Application settings.""" DATABASE_URL: str SECRET_KEY: str ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 API_V1_STR: str = "/api/v1"
class Config:
    env_file = ".env"
@lru_cache() def get_settings() -> Settings: return Settings()
from pydantic_settings import BaseSettings from functools import lru_cache
class Settings(BaseSettings): """应用配置。""" DATABASE_URL: str SECRET_KEY: str ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 API_V1_STR: str = "/api/v1"
class Config:
    env_file = ".env"
@lru_cache() def get_settings() -> Settings: return Settings()

core/database.py

core/database.py

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from app.core.config import get_settings
settings = get_settings()
engine = create_async_engine( settings.DATABASE_URL, echo=True, future=True )
AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False )
Base = declarative_base()
async def get_db() -> AsyncSession: """Dependency for database session.""" async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close()
undefined
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from app.core.config import get_settings
settings = get_settings()
engine = create_async_engine( settings.DATABASE_URL, echo=True, future=True )
AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False )
Base = declarative_base()
async def get_db() -> AsyncSession: """数据库会话依赖。""" async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close()
undefined

Pattern 2: CRUD Repository Pattern

模式2:CRUD仓库模式

python
undefined
python
undefined

repositories/base_repository.py

repositories/base_repository.py

from typing import Generic, TypeVar, Type, Optional, List from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from pydantic import BaseModel
ModelType = TypeVar("ModelType") CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel) UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class BaseRepository(Generic[ModelType, CreateSchemaType, UpdateSchemaType]): """Base repository for CRUD operations."""
def __init__(self, model: Type[ModelType]):
    self.model = model

async def get(self, db: AsyncSession, id: int) -> Optional[ModelType]:
    """Get by ID."""
    result = await db.execute(
        select(self.model).where(self.model.id == id)
    )
    return result.scalars().first()

async def get_multi(
    self,
    db: AsyncSession,
    skip: int = 0,
    limit: int = 100
) -> List[ModelType]:
    """Get multiple records."""
    result = await db.execute(
        select(self.model).offset(skip).limit(limit)
    )
    return result.scalars().all()

async def create(
    self,
    db: AsyncSession,
    obj_in: CreateSchemaType
) -> ModelType:
    """Create new record."""
    db_obj = self.model(**obj_in.dict())
    db.add(db_obj)
    await db.flush()
    await db.refresh(db_obj)
    return db_obj

async def update(
    self,
    db: AsyncSession,
    db_obj: ModelType,
    obj_in: UpdateSchemaType
) -> ModelType:
    """Update record."""
    update_data = obj_in.dict(exclude_unset=True)
    for field, value in update_data.items():
        setattr(db_obj, field, value)
    await db.flush()
    await db.refresh(db_obj)
    return db_obj

async def delete(self, db: AsyncSession, id: int) -> bool:
    """Delete record."""
    obj = await self.get(db, id)
    if obj:
        await db.delete(obj)
        return True
    return False
from typing import Generic, TypeVar, Type, Optional, List from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from pydantic import BaseModel
ModelType = TypeVar("ModelType") CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel) UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class BaseRepository(Generic[ModelType, CreateSchemaType, UpdateSchemaType]): """CRUD操作的基础仓库类。"""
def __init__(self, model: Type[ModelType]):
    self.model = model

async def get(self, db: AsyncSession, id: int) -> Optional[ModelType]:
    """根据ID获取记录。"""
    result = await db.execute(
        select(self.model).where(self.model.id == id)
    )
    return result.scalars().first()

async def get_multi(
    self,
    db: AsyncSession,
    skip: int = 0,
    limit: int = 100
) -> List[ModelType]:
    """获取多条记录。"""
    result = await db.execute(
        select(self.model).offset(skip).limit(limit)
    )
    return result.scalars().all()

async def create(
    self,
    db: AsyncSession,
    obj_in: CreateSchemaType
) -> ModelType:
    """创建新记录。"""
    db_obj = self.model(**obj_in.dict())
    db.add(db_obj)
    await db.flush()
    await db.refresh(db_obj)
    return db_obj

async def update(
    self,
    db: AsyncSession,
    db_obj: ModelType,
    obj_in: UpdateSchemaType
) -> ModelType:
    """更新记录。"""
    update_data = obj_in.dict(exclude_unset=True)
    for field, value in update_data.items():
        setattr(db_obj, field, value)
    await db.flush()
    await db.refresh(db_obj)
    return db_obj

async def delete(self, db: AsyncSession, id: int) -> bool:
    """删除记录。"""
    obj = await self.get(db, id)
    if obj:
        await db.delete(obj)
        return True
    return False

repositories/user_repository.py

repositories/user_repository.py

from app.repositories.base_repository import BaseRepository from app.models.user import User from app.schemas.user import UserCreate, UserUpdate
class UserRepository(BaseRepository[User, UserCreate, UserUpdate]): """User-specific repository."""
async def get_by_email(self, db: AsyncSession, email: str) -> Optional[User]:
    """Get user by email."""
    result = await db.execute(
        select(User).where(User.email == email)
    )
    return result.scalars().first()

async def is_active(self, db: AsyncSession, user_id: int) -> bool:
    """Check if user is active."""
    user = await self.get(db, user_id)
    return user.is_active if user else False
user_repository = UserRepository(User)
undefined
from app.repositories.base_repository import BaseRepository from app.models.user import User from app.schemas.user import UserCreate, UserUpdate
class UserRepository(BaseRepository[User, UserCreate, UserUpdate]): """用户专属仓库类。"""
async def get_by_email(self, db: AsyncSession, email: str) -> Optional[User]:
    """根据邮箱获取用户。"""
    result = await db.execute(
        select(User).where(User.email == email)
    )
    return result.scalars().first()

async def is_active(self, db: AsyncSession, user_id: int) -> bool:
    """检查用户是否激活。"""
    user = await self.get(db, user_id)
    return user.is_active if user else False
user_repository = UserRepository(User)
undefined

Pattern 3: Service Layer

模式3:服务层

python
undefined
python
undefined

services/user_service.py

services/user_service.py

from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from app.repositories.user_repository import user_repository from app.schemas.user import UserCreate, UserUpdate, User from app.core.security import get_password_hash, verify_password
class UserService: """Business logic for users."""
def __init__(self):
    self.repository = user_repository

async def create_user(
    self,
    db: AsyncSession,
    user_in: UserCreate
) -> User:
    """Create new user with hashed password."""
    # Check if email exists
    existing = await self.repository.get_by_email(db, user_in.email)
    if existing:
        raise ValueError("Email already registered")

    # Hash password
    user_in_dict = user_in.dict()
    user_in_dict["hashed_password"] = get_password_hash(user_in_dict.pop("password"))

    # Create user
    user = await self.repository.create(db, UserCreate(**user_in_dict))
    return user

async def authenticate(
    self,
    db: AsyncSession,
    email: str,
    password: str
) -> Optional[User]:
    """Authenticate user."""
    user = await self.repository.get_by_email(db, email)
    if not user:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

async def update_user(
    self,
    db: AsyncSession,
    user_id: int,
    user_in: UserUpdate
) -> Optional[User]:
    """Update user."""
    user = await self.repository.get(db, user_id)
    if not user:
        return None

    if user_in.password:
        user_in_dict = user_in.dict(exclude_unset=True)
        user_in_dict["hashed_password"] = get_password_hash(
            user_in_dict.pop("password")
        )
        user_in = UserUpdate(**user_in_dict)

    return await self.repository.update(db, user, user_in)
user_service = UserService()
undefined
from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from app.repositories.user_repository import user_repository from app.schemas.user import UserCreate, UserUpdate, User from app.core.security import get_password_hash, verify_password
class UserService: """用户相关业务逻辑。"""
def __init__(self):
    self.repository = user_repository

async def create_user(
    self,
    db: AsyncSession,
    user_in: UserCreate
) -> User:
    """创建带哈希密码的新用户。"""
    # 检查邮箱是否已存在
    existing = await self.repository.get_by_email(db, user_in.email)
    if existing:
        raise ValueError("邮箱已注册")

    # 哈希密码
    user_in_dict = user_in.dict()
    user_in_dict["hashed_password"] = get_password_hash(user_in_dict.pop("password"))

    # 创建用户
    user = await self.repository.create(db, UserCreate(**user_in_dict))
    return user

async def authenticate(
    self,
    db: AsyncSession,
    email: str,
    password: str
) -> Optional[User]:
    """验证用户身份。"""
    user = await self.repository.get_by_email(db, email)
    if not user:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

async def update_user(
    self,
    db: AsyncSession,
    user_id: int,
    user_in: UserUpdate
) -> Optional[User]:
    """更新用户信息。"""
    user = await self.repository.get(db, user_id)
    if not user:
        return None

    if user_in.password:
        user_in_dict = user_in.dict(exclude_unset=True)
        user_in_dict["hashed_password"] = get_password_hash(
            user_in_dict.pop("password")
        )
        user_in = UserUpdate(**user_in_dict)

    return await self.repository.update(db, user, user_in)
user_service = UserService()
undefined

Pattern 4: API Endpoints with Dependencies

模式4:带依赖的API端点

python
undefined
python
undefined

api/v1/endpoints/users.py

api/v1/endpoints/users.py

from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from typing import List
from app.core.database import get_db from app.schemas.user import User, UserCreate, UserUpdate from app.services.user_service import user_service from app.api.dependencies import get_current_user
router = APIRouter()
@router.post("/", response_model=User, status_code=status.HTTP_201_CREATED) async def create_user( user_in: UserCreate, db: AsyncSession = Depends(get_db) ): """Create new user.""" try: user = await user_service.create_user(db, user_in) return user except ValueError as e: raise HTTPException(status_code=400, detail=str(e))
@router.get("/me", response_model=User) async def read_current_user( current_user: User = Depends(get_current_user) ): """Get current user.""" return current_user
@router.get("/{user_id}", response_model=User) async def read_user( user_id: int, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """Get user by ID.""" user = await user_service.repository.get(db, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") return user
@router.patch("/{user_id}", response_model=User) async def update_user( user_id: int, user_in: UserUpdate, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """Update user.""" if current_user.id != user_id: raise HTTPException(status_code=403, detail="Not authorized")
user = await user_service.update_user(db, user_id, user_in)
if not user:
    raise HTTPException(status_code=404, detail="User not found")
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_user( user_id: int, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """Delete user.""" if current_user.id != user_id: raise HTTPException(status_code=403, detail="Not authorized")
deleted = await user_service.repository.delete(db, user_id)
if not deleted:
    raise HTTPException(status_code=404, detail="User not found")
undefined
from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from typing import List
from app.core.database import get_db from app.schemas.user import User, UserCreate, UserUpdate from app.services.user_service import user_service from app.api.dependencies import get_current_user
router = APIRouter()
@router.post("/", response_model=User, status_code=status.HTTP_201_CREATED) async def create_user( user_in: UserCreate, db: AsyncSession = Depends(get_db) ): """创建新用户。""" try: user = await user_service.create_user(db, user_in) return user except ValueError as e: raise HTTPException(status_code=400, detail=str(e))
@router.get("/me", response_model=User) async def read_current_user( current_user: User = Depends(get_current_user) ): """获取当前用户信息。""" return current_user
@router.get("/{user_id}", response_model=User) async def read_user( user_id: int, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """根据ID获取用户信息。""" user = await user_service.repository.get(db, user_id) if not user: raise HTTPException(status_code=404, detail="用户不存在") return user
@router.patch("/{user_id}", response_model=User) async def update_user( user_id: int, user_in: UserUpdate, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """更新用户信息。""" if current_user.id != user_id: raise HTTPException(status_code=403, detail="无权限操作")
user = await user_service.update_user(db, user_id, user_in)
if not user:
    raise HTTPException(status_code=404, detail="用户不存在")
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_user( user_id: int, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user) ): """删除用户。""" if current_user.id != user_id: raise HTTPException(status_code=403, detail="无权限操作")
deleted = await user_service.repository.delete(db, user_id)
if not deleted:
    raise HTTPException(status_code=404, detail="用户不存在")
undefined

Pattern 5: Authentication & Authorization

模式5:身份验证与授权

python
undefined
python
undefined

core/security.py

core/security.py

from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from app.core.config import get_settings
settings = get_settings() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Create JWT access token.""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify password against hash.""" return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str: """Hash password.""" return pwd_context.hash(password)
from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from app.core.config import get_settings
settings = get_settings() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """创建JWT访问令牌。""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool: """验证明文密码与哈希值是否匹配。""" return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str: """生成密码哈希值。""" return pwd_context.hash(password)

api/dependencies.py

api/dependencies.py

from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db from app.core.security import ALGORITHM from app.core.config import get_settings from app.repositories.user_repository import user_repository
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
async def get_current_user( db: AsyncSession = Depends(get_db), token: str = Depends(oauth2_scheme) ): """Get current authenticated user.""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, )
try:
    payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
    user_id: int = payload.get("sub")
    if user_id is None:
        raise credentials_exception
except JWTError:
    raise credentials_exception

user = await user_repository.get(db, user_id)
if user is None:
    raise credentials_exception

return user
undefined
from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db from app.core.security import ALGORITHM from app.core.config import get_settings from app.repositories.user_repository import user_repository
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
async def get_current_user( db: AsyncSession = Depends(get_db), token: str = Depends(oauth2_scheme) ): """获取当前已认证用户。""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无法验证凭据", headers={"WWW-Authenticate": "Bearer"}, )
try:
    payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
    user_id: int = payload.get("sub")
    if user_id is None:
        raise credentials_exception
except JWTError:
    raise credentials_exception

user = await user_repository.get(db, user_id)
if user is None:
    raise credentials_exception

return user
undefined

Testing

测试

python
undefined
python
undefined

tests/conftest.py

tests/conftest.py

import pytest import asyncio from httpx import AsyncClient from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker
from app.main import app from app.core.database import get_db, Base
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
@pytest.fixture(scope="session") def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close()
@pytest.fixture async def db_session(): engine = create_async_engine(TEST_DATABASE_URL, echo=True) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)
AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

async with AsyncSessionLocal() as session:
    yield session
@pytest.fixture async def client(db_session): async def override_get_db(): yield db_session
app.dependency_overrides[get_db] = override_get_db

async with AsyncClient(app=app, base_url="http://test") as client:
    yield client
import pytest import asyncio from httpx import AsyncClient from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker
from app.main import app from app.core.database import get_db, Base
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
@pytest.fixture(scope="session") def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close()
@pytest.fixture async def db_session(): engine = create_async_engine(TEST_DATABASE_URL, echo=True) async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)
AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

async with AsyncSessionLocal() as session:
    yield session
@pytest.fixture async def client(db_session): async def override_get_db(): yield db_session
app.dependency_overrides[get_db] = override_get_db

async with AsyncClient(app=app, base_url="http://test") as client:
    yield client

tests/test_users.py

tests/test_users.py

import pytest
@pytest.mark.asyncio async def test_create_user(client): response = await client.post( "/api/v1/users/", json={ "email": "test@example.com", "password": "testpass123", "name": "Test User" } ) assert response.status_code == 201 data = response.json() assert data["email"] == "test@example.com" assert "id" in data
undefined
import pytest
@pytest.mark.asyncio async def test_create_user(client): response = await client.post( "/api/v1/users/", json={ "email": "test@example.com", "password": "testpass123", "name": "Test User" } ) assert response.status_code == 201 data = response.json() assert data["email"] == "test@example.com" assert "id" in data
undefined

Resources

资源

  • references/fastapi-architecture.md: Detailed architecture guide
  • references/async-best-practices.md: Async/await patterns
  • references/testing-strategies.md: Comprehensive testing guide
  • assets/project-template/: Complete FastAPI project
  • assets/docker-compose.yml: Development environment setup
  • references/fastapi-architecture.md: 详细架构指南
  • references/async-best-practices.md: 异步/等待模式指南
  • references/testing-strategies.md: 全面测试指南
  • assets/project-template/: 完整FastAPI项目模板
  • assets/docker-compose.yml: 开发环境配置

Best Practices

最佳实践

  1. Async All The Way: Use async for database, external APIs
  2. Dependency Injection: Leverage FastAPI's DI system
  3. Repository Pattern: Separate data access from business logic
  4. Service Layer: Keep business logic out of routes
  5. Pydantic Schemas: Strong typing for request/response
  6. Error Handling: Consistent error responses
  7. Testing: Test all layers independently
  1. 全异步实现: 数据库、外部API均使用异步方式
  2. 依赖注入: 充分利用FastAPI的DI系统
  3. 仓库模式: 数据访问层与业务逻辑分离
  4. 服务层: 业务逻辑不直接写在路由中
  5. Pydantic模式: 请求/响应使用强类型校验
  6. 错误处理: 统一的错误响应格式
  7. 测试: 独立测试所有层级

Common Pitfalls

常见陷阱

  • Blocking Code in Async: Using synchronous database drivers
  • No Service Layer: Business logic in route handlers
  • Missing Type Hints: Loses FastAPI's benefits
  • Ignoring Sessions: Not properly managing database sessions
  • No Testing: Skipping integration tests
  • Tight Coupling: Direct database access in routes
  • 异步代码中的阻塞操作: 使用同步数据库驱动
  • 无服务层: 业务逻辑直接写在路由处理器中
  • 缺少类型提示: 丢失FastAPI的核心优势
  • 会话管理不当: 未正确管理数据库会话
  • 跳过测试: 忽略集成测试
  • 紧耦合: 路由中直接访问数据库