sqlalchemy-2-async

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

SQLAlchemy 2.0 Async Patterns ()

SQLAlchemy 2.0 异步模式

Modern async database patterns with SQLAlchemy 2.0, AsyncSession, and FastAPI integration.
基于SQLAlchemy 2.0、AsyncSession的现代异步数据库模式,以及与FastAPI的集成方案。

Overview

概述

  • Building async FastAPI applications with database access
  • Implementing async repository patterns
  • Configuring async connection pooling
  • Running concurrent database queries
  • Avoiding N+1 queries in async context
  • 构建支持数据库访问的异步FastAPI应用
  • 实现异步Repository模式
  • 配置异步连接池
  • 执行并发数据库查询
  • 在异步环境中避免N+1查询问题

Quick Reference

快速参考

Engine and Session Factory

引擎与会话工厂

python
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
)
python
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
)

Create async engine - ONE per application

Create async engine - ONE per application

engine = create_async_engine( "postgresql+asyncpg://user:pass@localhost/db", pool_size=20, max_overflow=10, pool_pre_ping=True, # Verify connections before use pool_recycle=3600, # Recycle connections after 1 hour echo=False, # Set True for SQL logging in dev )
engine = create_async_engine( "postgresql+asyncpg://user:pass@localhost/db", pool_size=20, max_overflow=10, pool_pre_ping=True, # Verify connections before use pool_recycle=3600, # Recycle connections after 1 hour echo=False, # Set True for SQL logging in dev )

Session factory - use this to create sessions

Session factory - use this to create sessions

async_session_factory = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, # Prevent lazy load issues autoflush=False, # Explicit flush control )
undefined
async_session_factory = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, # Prevent lazy load issues autoflush=False, # Explicit flush control )
undefined

FastAPI Dependency Injection

FastAPI依赖注入

python
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """Dependency that provides async database session."""
    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
python
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """Dependency that provides async database session."""
    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Usage in route

Usage in route

@router.get("/users/{user_id}") async def get_user( user_id: UUID, db: AsyncSession = Depends(get_db), ) -> UserResponse: result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(404, "User not found") return UserResponse.model_validate(user)
undefined
@router.get("/users/{user_id}") async def get_user( user_id: UUID, db: AsyncSession = Depends(get_db), ) -> UserResponse: result = await db.execute( select(User).where(User.id == user_id) ) user = result.scalar_one_or_none() if not user: raise HTTPException(404, "User not found") return UserResponse.model_validate(user)
undefined

Async Model Definition

异步模型定义

python
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from datetime import datetime, timezone
import uuid

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        primary_key=True,
        default=uuid.uuid4,
    )
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))

    # Relationship with explicit lazy loading strategy
    orders: Mapped[list["Order"]] = relationship(
        back_populates="user",
        lazy="raise",  # Prevent accidental lazy loads - MUST use selectinload
    )

class Order(Base):
    __tablename__ = "orders"

    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
    user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
    total: Mapped[int]

    user: Mapped["User"] = relationship(back_populates="orders", lazy="raise")
python
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from datetime import datetime, timezone
import uuid

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        primary_key=True,
        default=uuid.uuid4,
    )
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))

    # Relationship with explicit lazy loading strategy
    orders: Mapped[list["Order"]] = relationship(
        back_populates="user",
        lazy="raise",  # Prevent accidental lazy loads - MUST use selectinload
    )

class Order(Base):
    __tablename__ = "orders"

    id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
    user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
    total: Mapped[int]

    user: Mapped["User"] = relationship(back_populates="orders", lazy="raise")

Eager Loading (Avoid N+1)

预加载(避免N+1查询)

python
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy import select

async def get_user_with_orders(db: AsyncSession, user_id: UUID) -> User | None:
    """Load user with orders in single query - NO N+1."""
    result = await db.execute(
        select(User)
        .options(selectinload(User.orders))  # Eager load orders
        .where(User.id == user_id)
    )
    return result.scalar_one_or_none()

async def get_users_with_orders(db: AsyncSession, limit: int = 100) -> list[User]:
    """Load multiple users with orders efficiently."""
    result = await db.execute(
        select(User)
        .options(selectinload(User.orders))
        .limit(limit)
    )
    return list(result.scalars().all())
python
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy import select

async def get_user_with_orders(db: AsyncSession, user_id: UUID) -> User | None:
    """Load user with orders in single query - NO N+1."""
    result = await db.execute(
        select(User)
        .options(selectinload(User.orders))  # Eager load orders
        .where(User.id == user_id)
    )
    return result.scalar_one_or_none()

async def get_users_with_orders(db: AsyncSession, limit: int = 100) -> list[User]:
    """Load multiple users with orders efficiently."""
    result = await db.execute(
        select(User)
        .options(selectinload(User.orders))
        .limit(limit)
    )
    return list(result.scalars().all())

Bulk Operations ( Optimized)

批量操作(优化版)

python
async def bulk_insert_users(db: AsyncSession, users_data: list[dict]) -> int:
    """Efficient bulk insert - SQLAlchemy 2.0 uses multi-value INSERT."""
    # SQLAlchemy 2.0 automatically batches as single INSERT with multiple VALUES
    users = [User(**data) for data in users_data]
    db.add_all(users)
    await db.flush()  # Get IDs without committing
    return len(users)

async def bulk_insert_chunked(
    db: AsyncSession,
    items: list[dict],
    chunk_size: int = 1000,
) -> int:
    """Insert large datasets in chunks to manage memory."""
    total = 0
    for i in range(0, len(items), chunk_size):
        chunk = items[i:i + chunk_size]
        db.add_all([Item(**data) for data in chunk])
        await db.flush()
        total += len(chunk)
    return total
python
async def bulk_insert_users(db: AsyncSession, users_data: list[dict]) -> int:
    """Efficient bulk insert - SQLAlchemy 2.0 uses multi-value INSERT."""
    # SQLAlchemy 2.0 automatically batches as single INSERT with multiple VALUES
    users = [User(**data) for data in users_data]
    db.add_all(users)
    await db.flush()  # Get IDs without committing
    return len(users)

async def bulk_insert_chunked(
    db: AsyncSession,
    items: list[dict],
    chunk_size: int = 1000,
) -> int:
    """Insert large datasets in chunks to manage memory."""
    total = 0
    for i in range(0, len(items), chunk_size):
        chunk = items[i:i + chunk_size]
        db.add_all([Item(**data) for data in chunk])
        await db.flush()
        total += len(chunk)
    return total

Repository Pattern

Repository模式

python
from typing import Generic, TypeVar
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

T = TypeVar("T", bound=Base)

class AsyncRepository(Generic[T]):
    """Generic async repository for CRUD operations."""

    def __init__(self, session: AsyncSession, model: type[T]):
        self.session = session
        self.model = model

    async def get(self, id: UUID) -> T | None:
        return await self.session.get(self.model, id)

    async def get_many(self, ids: list[UUID]) -> list[T]:
        result = await self.session.execute(
            select(self.model).where(self.model.id.in_(ids))
        )
        return list(result.scalars().all())

    async def create(self, **kwargs) -> T:
        instance = self.model(**kwargs)
        self.session.add(instance)
        await self.session.flush()
        return instance

    async def update(self, instance: T, **kwargs) -> T:
        for key, value in kwargs.items():
            setattr(instance, key, value)
        await self.session.flush()
        return instance

    async def delete(self, instance: T) -> None:
        await self.session.delete(instance)
        await self.session.flush()
python
from typing import Generic, TypeVar
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

T = TypeVar("T", bound=Base)

class AsyncRepository(Generic[T]):
    """Generic async repository for CRUD operations."""

    def __init__(self, session: AsyncSession, model: type[T]):
        self.session = session
        self.model = model

    async def get(self, id: UUID) -> T | None:
        return await self.session.get(self.model, id)

    async def get_many(self, ids: list[UUID]) -> list[T]:
        result = await self.session.execute(
            select(self.model).where(self.model.id.in_(ids))
        )
        return list(result.scalars().all())

    async def create(self, **kwargs) -> T:
        instance = self.model(**kwargs)
        self.session.add(instance)
        await self.session.flush()
        return instance

    async def update(self, instance: T, **kwargs) -> T:
        for key, value in kwargs.items():
            setattr(instance, key, value)
        await self.session.flush()
        return instance

    async def delete(self, instance: T) -> None:
        await self.session.delete(instance)
        await self.session.flush()

Concurrent Queries with TaskGroup

基于TaskGroup的并发查询

python
import asyncio

async def get_dashboard_data(db: AsyncSession, user_id: UUID) -> dict:
    """Run multiple queries concurrently - same session is NOT thread-safe."""
    # WRONG: Don't share AsyncSession across tasks
    # async with asyncio.TaskGroup() as tg:
    #     tg.create_task(db.execute(...))  # NOT SAFE

    # CORRECT: Sequential queries with same session
    user = await db.get(User, user_id)
    orders_result = await db.execute(
        select(Order).where(Order.user_id == user_id).limit(10)
    )
    stats_result = await db.execute(
        select(func.count(Order.id)).where(Order.user_id == user_id)
    )

    return {
        "user": user,
        "recent_orders": list(orders_result.scalars().all()),
        "total_orders": stats_result.scalar(),
    }

async def get_data_from_multiple_users(user_ids: list[UUID]) -> list[dict]:
    """Concurrent queries - each task gets its own session."""
    async def fetch_user(user_id: UUID) -> dict:
        async with async_session_factory() as session:
            user = await session.get(User, user_id)
            return {"id": user_id, "email": user.email if user else None}

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]

    return [t.result() for t in tasks]
python
import asyncio

async def get_dashboard_data(db: AsyncSession, user_id: UUID) -> dict:
    """Run multiple queries concurrently - same session is NOT thread-safe."""
    # WRONG: Don't share AsyncSession across tasks
    # async with asyncio.TaskGroup() as tg:
    #     tg.create_task(db.execute(...))  # NOT SAFE

    # CORRECT: Sequential queries with same session
    user = await db.get(User, user_id)
    orders_result = await db.execute(
        select(Order).where(Order.user_id == user_id).limit(10)
    )
    stats_result = await db.execute(
        select(func.count(Order.id)).where(Order.user_id == user_id)
    )

    return {
        "user": user,
        "recent_orders": list(orders_result.scalars().all()),
        "total_orders": stats_result.scalar(),
    }

async def get_data_from_multiple_users(user_ids: list[UUID]) -> list[dict]:
    """Concurrent queries - each task gets its own session."""
    async def fetch_user(user_id: UUID) -> dict:
        async with async_session_factory() as session:
            user = await session.get(User, user_id)
            return {"id": user_id, "email": user.email if user else None}

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]

    return [t.result() for t in tasks]

Key Decisions

关键决策

DecisionRecommendationRationale
Session scopeOne AsyncSession per task/requestSQLAlchemy docs: "AsyncSession per task"
Scoped sessionsAvoid for asyncMaintainers discourage for async code
Lazy loadingUse
lazy="raise"
+ explicit loads
Prevents accidental N+1 in async
Eager loading
selectinload
for collections
Better than joinedload for async
expire_on_commitSet to
False
Prevents lazy load errors after commit
Connection pool
pool_pre_ping=True
Validates connections before use
Bulk insertsChunk 1000-10000 rowsMemory management for large inserts
决策推荐方案理由
会话作用域每个任务/请求对应一个AsyncSessionSQLAlchemy文档:“每个任务对应一个AsyncSession”
作用域会话异步场景下请避免使用维护者不建议在异步代码中使用
延迟加载使用
lazy="raise"
+ 显式加载
防止异步环境中意外出现N+1查询
预加载集合类型使用
selectinload
在异步场景下表现优于joinedload
commit后过期设置设置为
False
避免commit后出现延迟加载错误
连接池设置
pool_pre_ping=True
在使用前验证连接有效性
批量插入按1000-10000行分块对大数量插入进行内存管理

Anti-Patterns (FORBIDDEN)

反模式(禁止使用)

python
undefined
python
undefined

NEVER share AsyncSession across tasks

NEVER share AsyncSession across tasks

async with asyncio.TaskGroup() as tg: tg.create_task(session.execute(...)) # RACE CONDITION
async with asyncio.TaskGroup() as tg: tg.create_task(session.execute(...)) # RACE CONDITION

NEVER use sync Session in async code

NEVER use sync Session in async code

from sqlalchemy.orm import Session session = Session(engine) # BLOCKS EVENT LOOP
from sqlalchemy.orm import Session session = Session(engine) # BLOCKS EVENT LOOP

NEVER access lazy-loaded relationships without eager loading

NEVER access lazy-loaded relationships without eager loading

user = await session.get(User) orders = user.orders # RAISES if lazy="raise", or BLOCKS if not
user = await session.get(User) orders = user.orders # RAISES if lazy="raise", or BLOCKS if not

NEVER use scoped_session with async

NEVER use scoped_session with async

from sqlalchemy.orm import scoped_session ScopedSession = scoped_session(session_factory) # WRONG for async
from sqlalchemy.orm import scoped_session ScopedSession = scoped_session(session_factory) # WRONG for async

NEVER forget to handle session lifecycle

NEVER forget to handle session lifecycle

session = async_session_factory() result = await session.execute(...)
session = async_session_factory() result = await session.execute(...)

MISSING: session.close() - connection leak!

MISSING: session.close() - connection leak!

NEVER use create_async_engine without pool_pre_ping in production

NEVER use create_async_engine without pool_pre_ping in production

engine = create_async_engine(url) # May use stale connections
undefined
engine = create_async_engine(url) # May use stale connections
undefined

Related Skills

相关技能

  • asyncio-advanced
    - TaskGroup and structured concurrency patterns
  • alembic-migrations
    - Database migration with async support
  • fastapi-advanced
    - Full FastAPI integration patterns
  • database-schema-designer
    - Schema design best practices
  • asyncio-advanced
    - TaskGroup与结构化并发模式
  • alembic-migrations
    - 支持异步的数据库迁移
  • fastapi-advanced
    - 完整的FastAPI集成模式
  • database-schema-designer
    - 数据库Schema设计最佳实践

Capability Details

能力详情

async-session

async-session

Keywords: AsyncSession, async_sessionmaker, session factory, connection Solves:
  • How do I create async database sessions?
  • Configure async connection pooling
  • Session lifecycle management
关键词: AsyncSession, async_sessionmaker, 会话工厂, 连接 解决的问题:
  • 如何创建异步数据库会话?
  • 配置异步连接池
  • 会话生命周期管理

fastapi-integration

fastapi-integration

Keywords: Depends, dependency injection, get_db, request scope Solves:
  • How do I integrate SQLAlchemy with FastAPI?
  • Request-scoped database sessions
  • Automatic commit/rollback handling
关键词: Depends, 依赖注入, get_db, 请求作用域 解决的问题:
  • 如何将SQLAlchemy与FastAPI集成?
  • 请求作用域的数据库会话
  • 自动提交/回滚处理

eager-loading

eager-loading

Keywords: selectinload, joinedload, eager load, N+1, relationship Solves:
  • How do I avoid N+1 queries in async?
  • Load relationships efficiently
  • Configure lazy loading behavior
关键词: selectinload, joinedload, 预加载, N+1, 关联关系 解决的问题:
  • 如何在异步环境中避免N+1查询?
  • 高效加载关联关系
  • 配置延迟加载行为

bulk-operations

bulk-operations

Keywords: bulk insert, batch, chunk, add_all, performance Solves:
  • How do I insert many rows efficiently?
  • Chunk large inserts for memory
  • SQLAlchemy 2.0 bulk optimizations
关键词: 批量插入, 批处理, 分块, add_all, 性能 解决的问题:
  • 如何高效插入大量数据行?
  • 对大数量插入进行分块以优化内存
  • SQLAlchemy 2.0 批量操作优化

repository-pattern

repository-pattern

Keywords: repository, CRUD, generic, base repository Solves:
  • How do I implement repository pattern?
  • Generic async CRUD operations
  • Clean architecture with SQLAlchemy
关键词: repository, CRUD, 泛型, 基础repository 解决的问题:
  • 如何实现Repository模式?
  • 泛型异步CRUD操作
  • 结合SQLAlchemy的整洁架构