database

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Database Integration (SQLAlchemy Async + Alembic)

数据库集成(SQLAlchemy 异步 + Alembic)

Overview

概述

Use SQLAlchemy 2.0+ with async support as the database toolkit for FastAPI applications. SQLAlchemy provides both ORM (Object-Relational Mapping) and Core (SQL expression) layers. Pair with Alembic for schema migrations.
Key packages:
bash
uv add "sqlalchemy[asyncio]" alembic asyncpg  # PostgreSQL
使用支持异步的SQLAlchemy 2.0+作为FastAPI应用的数据库工具包。SQLAlchemy同时提供ORM(对象关系映射)和Core(SQL表达式)两层。搭配Alembic进行Schema迁移。
关键包:
bash
uv add "sqlalchemy[asyncio]" alembic asyncpg  # PostgreSQL

or

or

uv add "sqlalchemy[asyncio]" alembic aiosqlite # SQLite

- `sqlalchemy[asyncio]` -- async engine and session support
- `asyncpg` -- high-performance async PostgreSQL driver
- `aiosqlite` -- async SQLite driver
- `alembic` -- database migration tool
uv add "sqlalchemy[asyncio]" alembic aiosqlite # SQLite

- `sqlalchemy[asyncio]` -- 异步引擎和会话支持
- `asyncpg` -- 高性能异步PostgreSQL驱动
- `aiosqlite` -- 异步SQLite驱动
- `alembic` -- 数据库迁移工具

Async Engine and Session

异步引擎与会话

Engine Setup

引擎配置

python
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:5432/mydb"
python
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:5432/mydb"

For SQLite: "sqlite+aiosqlite:///./app.db"

For SQLite: "sqlite+aiosqlite:///./app.db"

engine = create_async_engine( DATABASE_URL, echo=False, # Set True for SQL query logging pool_size=5, # Number of persistent connections max_overflow=10, # Additional connections allowed beyond pool_size pool_pre_ping=True, # Verify connections before use pool_recycle=3600, # Recycle connections after 1 hour )
async_session = async_sessionmaker( engine, expire_on_commit=False, # Prevent lazy-load after commit in async )
undefined
engine = create_async_engine( DATABASE_URL, echo=False, # 设置为True可开启SQL查询日志 pool_size=5, # 持久化连接数量 max_overflow=10, # 超出pool_size允许的额外连接数 pool_pre_ping=True, # 使用前验证连接有效性 pool_recycle=3600, # 1小时后回收连接 )
async_session = async_sessionmaker( engine, expire_on_commit=False, # 异步环境下提交后避免延迟加载 )
undefined

FastAPI Integration with Lifespan

与FastAPI生命周期集成

python
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    # Engine is created at module level; dispose on shutdown
    yield
    await engine.dispose()


app = FastAPI(lifespan=lifespan)


async def get_db() -> AsyncIterator[AsyncSession]:
    async with async_session() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
Use
Depends(get_db)
in route functions to inject a session per request.
python
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    # 引擎在模块级别创建;关闭时释放资源
    yield
    await engine.dispose()


app = FastAPI(lifespan=lifespan)


async def get_db() -> AsyncIterator[AsyncSession]:
    async with async_session() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
在路由函数中使用
Depends(get_db)
,为每个请求注入一个会话。

Model Definition

模型定义

Declarative Base

声明式基类

python
from datetime import datetime

from sqlalchemy import String, Text, ForeignKey, func
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    mapped_column,
    relationship,
)


class Base(DeclarativeBase):
    pass


class TimestampMixin:
    created_at: Mapped[datetime] = mapped_column(
        server_default=func.now(),
    )
    updated_at: Mapped[datetime] = mapped_column(
        server_default=func.now(),
        onupdate=func.now(),
    )
python
from datetime import datetime

from sqlalchemy import String, Text, ForeignKey, func
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    mapped_column,
    relationship,
)


class Base(DeclarativeBase):
    pass


class TimestampMixin:
    created_at: Mapped[datetime] = mapped_column(
        server_default=func.now(),
    )
    updated_at: Mapped[datetime] = mapped_column(
        server_default=func.now(),
        onupdate=func.now(),
    )

Model Examples

模型示例

python
class User(TimestampMixin, Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    name: Mapped[str] = mapped_column(String(100))
    hashed_password: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(default=True)

    # Relationships
    posts: Mapped[list["Post"]] = relationship(
        back_populates="author",
        cascade="all, delete-orphan",
    )


class Post(TimestampMixin, Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(Text)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    author: Mapped["User"] = relationship(back_populates="posts")
Use
Mapped[type]
with
mapped_column()
for all column definitions (SQLAlchemy 2.0 style). Avoid the legacy
Column()
syntax.
python
class User(TimestampMixin, Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    name: Mapped[str] = mapped_column(String(100))
    hashed_password: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(default=True)

    # 关联关系
    posts: Mapped[list["Post"]] = relationship(
        back_populates="author",
        cascade="all, delete-orphan",
    )


class Post(TimestampMixin, Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(Text)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    author: Mapped["User"] = relationship(back_populates="posts")
所有列定义请使用
Mapped[type]
搭配
mapped_column()
(SQLAlchemy 2.0风格)。避免使用旧版
Column()
语法。

CRUD Operations

CRUD操作

Repository Pattern

仓库模式

python
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession


class UserRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_by_id(self, user_id: int) -> User | None:
        return await self.session.get(User, user_id)

    async def get_by_email(self, email: str) -> User | None:
        stmt = select(User).where(User.email == email)
        result = await self.session.execute(stmt)
        return result.scalar_one_or_none()

    async def list_users(
        self, *, offset: int = 0, limit: int = 20
    ) -> list[User]:
        stmt = select(User).offset(offset).limit(limit).order_by(User.id)
        result = await self.session.execute(stmt)
        return list(result.scalars().all())

    async def create(self, **kwargs) -> User:
        user = User(**kwargs)
        self.session.add(user)
        await self.session.flush()  # Assign ID without committing
        return user

    async def update(self, user: User, **kwargs) -> User:
        for key, value in kwargs.items():
            setattr(user, key, value)
        await self.session.flush()
        return user

    async def delete(self, user: User) -> None:
        await self.session.delete(user)

    async def count(self) -> int:
        stmt = select(func.count()).select_from(User)
        result = await self.session.execute(stmt)
        return result.scalar_one()
python
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession


class UserRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_by_id(self, user_id: int) -> User | None:
        return await self.session.get(User, user_id)

    async def get_by_email(self, email: str) -> User | None:
        stmt = select(User).where(User.email == email)
        result = await self.session.execute(stmt)
        return result.scalar_one_or_none()

    async def list_users(
        self, *, offset: int = 0, limit: int = 20
    ) -> list[User]:
        stmt = select(User).offset(offset).limit(limit).order_by(User.id)
        result = await self.session.execute(stmt)
        return list(result.scalars().all())

    async def create(self, **kwargs) -> User:
        user = User(**kwargs)
        self.session.add(user)
        await self.session.flush()  # 分配ID但不提交
        return user

    async def update(self, user: User, **kwargs) -> User:
        for key, value in kwargs.items():
            setattr(user, key, value)
        await self.session.flush()
        return user

    async def delete(self, user: User) -> None:
        await self.session.delete(user)

    async def count(self) -> int:
        stmt = select(func.count()).select_from(User)
        result = await self.session.execute(stmt)
        return result.scalar_one()

Using in FastAPI Routes

在FastAPI路由中使用

python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

router = APIRouter(prefix="/users", tags=["users"])


@router.get("/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    repo = UserRepository(db)
    user = await repo.get_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user


@router.post("/", status_code=201)
async def create_user(
    data: UserCreate,
    db: AsyncSession = Depends(get_db),
):
    repo = UserRepository(db)
    existing = await repo.get_by_email(data.email)
    if existing:
        raise HTTPException(status_code=409, detail="Email already registered")
    user = await repo.create(**data.model_dump())
    return user
python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

router = APIRouter(prefix="/users", tags=["users"])


@router.get("/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    repo = UserRepository(db)
    user = await repo.get_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="用户不存在")
    return user


@router.post("/", status_code=201)
async def create_user(
    data: UserCreate,
    db: AsyncSession = Depends(get_db),
):
    repo = UserRepository(db)
    existing = await repo.get_by_email(data.email)
    if existing:
        raise HTTPException(status_code=409, detail="该邮箱已注册")
    user = await repo.create(**data.model_dump())
    return user

Query Patterns

查询模式

Eager Loading (Avoid N+1)

预加载(避免N+1问题)

python
from sqlalchemy.orm import selectinload, joinedload
python
from sqlalchemy.orm import selectinload, joinedload

selectinload: separate IN query (best for collections)

selectinload: 单独的IN查询(适合集合类型关联)

stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)
stmt = select(User).options(selectinload(User.posts)).where(User.id == user_id)

joinedload: LEFT JOIN (best for single relationships)

joinedload: LEFT JOIN(适合单个关联关系)

stmt = select(Post).options(joinedload(Post.author)).where(Post.id == post_id)

Always use eager loading when accessing relationships in async contexts. Lazy loading raises errors under async sessions.
stmt = select(Post).options(joinedload(Post.author)).where(Post.id == post_id)

在异步环境中访问关联关系时,请始终使用预加载。异步会话下延迟加载会报错。

Filtering and Ordering

过滤与排序

python
from sqlalchemy import and_, or_, desc
python
from sqlalchemy import and_, or_, desc

Complex filters

复杂过滤条件

stmt = ( select(User) .where( and_( User.is_active == True, or_( User.name.ilike(f"%{query}%"), User.email.ilike(f"%{query}%"), ), ) ) .order_by(desc(User.created_at)) .offset(offset) .limit(limit) )
undefined
stmt = ( select(User) .where( and_( User.is_active == True, or_( User.name.ilike(f"%{query}%"), User.email.ilike(f"%{query}%"), ), ) ) .order_by(desc(User.created_at)) .offset(offset) .limit(limit) )
undefined

Pagination

分页

python
from pydantic import BaseModel


class PaginatedResponse[T](BaseModel):
    items: list[T]
    total: int
    offset: int
    limit: int

    @property
    def has_more(self) -> bool:
        return self.offset + self.limit < self.total


async def paginate(
    session: AsyncSession,
    stmt,
    *,
    offset: int = 0,
    limit: int = 20,
) -> tuple[list, int]:
    # Count query
    count_stmt = select(func.count()).select_from(stmt.subquery())
    total = (await session.execute(count_stmt)).scalar_one()

    # Data query
    result = await session.execute(stmt.offset(offset).limit(limit))
    items = list(result.scalars().all())

    return items, total
python
from pydantic import BaseModel


class PaginatedResponse[T](BaseModel):
    items: list[T]
    total: int
    offset: int
    limit: int

    @property
    def has_more(self) -> bool:
        return self.offset + self.limit < self.total


async def paginate(
    session: AsyncSession,
    stmt,
    *,
    offset: int = 0,
    limit: int = 20,
) -> tuple[list, int]:
    # 计数查询
    count_stmt = select(func.count()).select_from(stmt.subquery())
    total = (await session.execute(count_stmt)).scalar_one()

    # 数据查询
    result = await session.execute(stmt.offset(offset).limit(limit))
    items = list(result.scalars().all())

    return items, total

Alembic Migrations

Alembic迁移

Setup

初始化

bash
undefined
bash
undefined

Initialize Alembic

初始化Alembic

uv run alembic init alembic
uv run alembic init alembic

For async support, use the async template

如需异步支持,使用异步模板

uv run alembic init -t async alembic

Configure `alembic/env.py`:

```python
uv run alembic init -t async alembic

配置`alembic/env.py`:

```python

alembic/env.py

alembic/env.py

from app.database import Base, DATABASE_URL from app.models import User, Post # Import all models
config = context.config config.set_main_option("sqlalchemy.url", DATABASE_URL.replace("+asyncpg", ""))
target_metadata = Base.metadata

For the async template, update `run_async_migrations()` in `env.py` to use your async engine.
from app.database import Base, DATABASE_URL from app.models import User, Post # 导入所有模型
config = context.config config.set_main_option("sqlalchemy.url", DATABASE_URL.replace("+asyncpg", ""))
target_metadata = Base.metadata

如果使用异步模板,请更新`env.py`中的`run_async_migrations()`以使用你的异步引擎。

Migration Commands

迁移命令

bash
undefined
bash
undefined

Generate a migration from model changes

根据模型变更生成迁移脚本

uv run alembic revision --autogenerate -m "add users table"
uv run alembic revision --autogenerate -m "add users table"

Apply all pending migrations

应用所有待执行的迁移

uv run alembic upgrade head
uv run alembic upgrade head

Rollback one migration

回滚一个迁移

uv run alembic downgrade -1
uv run alembic downgrade -1

Show current migration status

查看当前迁移状态

uv run alembic current
uv run alembic current

Show migration history

查看迁移历史

uv run alembic history
undefined
uv run alembic history
undefined

Migration Best Practices

迁移最佳实践

  1. Always review auto-generated migrations before applying -- autogenerate cannot detect all changes (renamed columns, data migrations).
  2. Test migrations both ways -- run
    upgrade
    and
    downgrade
    to verify reversibility.
  3. Use descriptive revision messages --
    add_users_table
    not
    update
    .
  4. Never edit applied migrations -- create new migrations instead.
  5. Include migrations in version control -- the
    alembic/versions/
    directory should be committed.
  1. 应用前务必检查自动生成的迁移脚本——自动生成无法检测所有变更(如列重命名、数据迁移)。
  2. 双向测试迁移——运行
    upgrade
    downgrade
    验证可逆性。
  3. 使用描述性的版本信息——比如
    add users table
    而非
    update
  4. 切勿修改已应用的迁移脚本——如需调整请创建新的迁移。
  5. 将迁移纳入版本控制——
    alembic/versions/
    目录需提交至代码仓库。

Connection Pool Tuning

连接池调优

ParameterDefaultDescription
pool_size
5Number of persistent connections in the pool
max_overflow
10Extra connections allowed beyond
pool_size
pool_timeout
30Seconds to wait for a connection before error
pool_recycle
-1Seconds before a connection is recycled (set for PG)
pool_pre_ping
FalseTest connections before checkout (set True for prod)
Production recommendation for a 4-worker FastAPI app:
python
engine = create_async_engine(
    DATABASE_URL,
    pool_size=5,          # 5 per worker = 20 total connections
    max_overflow=10,       # Burst to 15 per worker
    pool_pre_ping=True,    # Handle dropped connections
    pool_recycle=3600,     # Recycle hourly
)
Total max connections =
workers * (pool_size + max_overflow)
. Ensure the database
max_connections
setting accommodates this.
参数默认值描述
pool_size
5连接池中的持久化连接数量
max_overflow
10超出pool_size允许的额外临时连接数
pool_timeout
30获取连接超时前等待的秒数
pool_recycle
-1连接回收的时间间隔(PostgreSQL需设置)
pool_pre_ping
False取出连接前验证有效性(生产环境建议设为True)
针对4个Worker的FastAPI生产环境建议配置:
python
engine = create_async_engine(
    DATABASE_URL,
    pool_size=5,          # 每个Worker 5个连接 = 总计20个连接
    max_overflow=10,       # 每个Worker可额外扩展15个连接
    pool_pre_ping=True,    # 处理断开的连接
    pool_recycle=3600,     # 每小时回收连接
)
最大总连接数 =
workers * (pool_size + max_overflow)
。需确保数据库的
max_connections
设置能容纳此数量。

Testing with Database

数据库测试

In-Memory SQLite for Unit Tests

单元测试使用内存SQLite

python
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

from app.database import Base, get_db
from app.main import create_app


@pytest.fixture
async def db_engine():
    engine = create_async_engine("sqlite+aiosqlite:///:memory:")
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield engine
    await engine.dispose()


@pytest.fixture
async def db_session(db_engine):
    session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
    async with session_factory() as session:
        yield session
        await session.rollback()


@pytest.fixture
def app(db_session):
    app = create_app()
    app.dependency_overrides[get_db] = lambda: db_session
    yield app
    app.dependency_overrides.clear()
python
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

from app.database import Base, get_db
from app.main import create_app


@pytest.fixture
async def db_engine():
    engine = create_async_engine("sqlite+aiosqlite:///:memory:")
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield engine
    await engine.dispose()


@pytest.fixture
async def db_session(db_engine):
    session_factory = async_sessionmaker(db_engine, expire_on_commit=False)
    async with session_factory() as session:
        yield session
        await session.rollback()


@pytest.fixture
def app(db_session):
    app = create_app()
    app.dependency_overrides[get_db] = lambda: db_session
    yield app
    app.dependency_overrides.clear()

Transaction Rollback Pattern

事务回滚模式

Wrap each test in a transaction that always rolls back for isolation:
python
@pytest.fixture
async def db_session(db_engine):
    async with db_engine.connect() as conn:
        trans = await conn.begin()
        session = AsyncSession(bind=conn, expire_on_commit=False)
        yield session
        await trans.rollback()
将每个测试包裹在事务中,测试结束后始终回滚以保证隔离性:
python
@pytest.fixture
async def db_session(db_engine):
    async with db_engine.connect() as conn:
        trans = await conn.begin()
        session = AsyncSession(bind=conn, expire_on_commit=False)
        yield session
        await trans.rollback()

Cross-References

交叉引用

  • For Pydantic request/response models, consult the
    pydantic
    skill.
  • For FastAPI routing and dependency injection, consult the
    app-scaffolding
    skill.
  • For async patterns and error handling, consult the
    async-patterns
    skill.
  • For Docker Compose with PostgreSQL, consult the
    docker-build
    skill.
  • For test fixtures and pytest patterns, consult the
    test-runner
    skill.
  • 如需了解Pydantic请求/响应模型,请参考
    pydantic
    技能。
  • 如需了解FastAPI路由和依赖注入,请参考
    app-scaffolding
    技能。
  • 如需了解异步模式和错误处理,请参考
    async-patterns
    技能。
  • 如需了解搭配Docker Compose使用PostgreSQL,请参考
    docker-build
    技能。
  • 如需了解测试夹具和pytest模式,请参考
    test-runner
    技能。