sqlalchemy-2-async
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseSQLAlchemy 2.0 Async Patterns ()
SQLAlchemy 2.0 异步模式
Modern async database patterns with SQLAlchemy 2.0, AsyncSession, and FastAPI integration.
基于SQLAlchemy 2.0、AsyncSession的现代异步数据库模式,以及与FastAPI的集成方案。
Overview
概述
- Building async FastAPI applications with database access
- Implementing async repository patterns
- Configuring async connection pooling
- Running concurrent database queries
- Avoiding N+1 queries in async context
- 构建支持数据库访问的异步FastAPI应用
- 实现异步Repository模式
- 配置异步连接池
- 执行并发数据库查询
- 在异步环境中避免N+1查询问题
Quick Reference
快速参考
Engine and Session Factory
引擎与会话工厂
python
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)python
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)Create async engine - ONE per application
Create async engine - ONE per application
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle connections after 1 hour
echo=False, # Set True for SQL logging in dev
)
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle connections after 1 hour
echo=False, # Set True for SQL logging in dev
)
Session factory - use this to create sessions
Session factory - use this to create sessions
async_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Prevent lazy load issues
autoflush=False, # Explicit flush control
)
undefinedasync_session_factory = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Prevent lazy load issues
autoflush=False, # Explicit flush control
)
undefinedFastAPI Dependency Injection
FastAPI依赖注入
python
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Dependency that provides async database session."""
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raisepython
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Dependency that provides async database session."""
async with async_session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseUsage in route
Usage in route
@router.get("/users/{user_id}")
async def get_user(
user_id: UUID,
db: AsyncSession = Depends(get_db),
) -> UserResponse:
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(404, "User not found")
return UserResponse.model_validate(user)
undefined@router.get("/users/{user_id}")
async def get_user(
user_id: UUID,
db: AsyncSession = Depends(get_db),
) -> UserResponse:
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(404, "User not found")
return UserResponse.model_validate(user)
undefinedAsync Model Definition
异步模型定义
python
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from datetime import datetime, timezone
import uuid
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))
# Relationship with explicit lazy loading strategy
orders: Mapped[list["Order"]] = relationship(
back_populates="user",
lazy="raise", # Prevent accidental lazy loads - MUST use selectinload
)
class Order(Base):
__tablename__ = "orders"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
total: Mapped[int]
user: Mapped["User"] = relationship(back_populates="orders", lazy="raise")python
from sqlalchemy import String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID
from datetime import datetime, timezone
import uuid
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
created_at: Mapped[datetime] = mapped_column(default=lambda: datetime.now(timezone.utc))
# Relationship with explicit lazy loading strategy
orders: Mapped[list["Order"]] = relationship(
back_populates="user",
lazy="raise", # Prevent accidental lazy loads - MUST use selectinload
)
class Order(Base):
__tablename__ = "orders"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True)
user_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("users.id"))
total: Mapped[int]
user: Mapped["User"] = relationship(back_populates="orders", lazy="raise")Eager Loading (Avoid N+1)
预加载(避免N+1查询)
python
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy import select
async def get_user_with_orders(db: AsyncSession, user_id: UUID) -> User | None:
"""Load user with orders in single query - NO N+1."""
result = await db.execute(
select(User)
.options(selectinload(User.orders)) # Eager load orders
.where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_users_with_orders(db: AsyncSession, limit: int = 100) -> list[User]:
"""Load multiple users with orders efficiently."""
result = await db.execute(
select(User)
.options(selectinload(User.orders))
.limit(limit)
)
return list(result.scalars().all())python
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy import select
async def get_user_with_orders(db: AsyncSession, user_id: UUID) -> User | None:
"""Load user with orders in single query - NO N+1."""
result = await db.execute(
select(User)
.options(selectinload(User.orders)) # Eager load orders
.where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_users_with_orders(db: AsyncSession, limit: int = 100) -> list[User]:
"""Load multiple users with orders efficiently."""
result = await db.execute(
select(User)
.options(selectinload(User.orders))
.limit(limit)
)
return list(result.scalars().all())Bulk Operations ( Optimized)
批量操作(优化版)
python
async def bulk_insert_users(db: AsyncSession, users_data: list[dict]) -> int:
"""Efficient bulk insert - SQLAlchemy 2.0 uses multi-value INSERT."""
# SQLAlchemy 2.0 automatically batches as single INSERT with multiple VALUES
users = [User(**data) for data in users_data]
db.add_all(users)
await db.flush() # Get IDs without committing
return len(users)
async def bulk_insert_chunked(
db: AsyncSession,
items: list[dict],
chunk_size: int = 1000,
) -> int:
"""Insert large datasets in chunks to manage memory."""
total = 0
for i in range(0, len(items), chunk_size):
chunk = items[i:i + chunk_size]
db.add_all([Item(**data) for data in chunk])
await db.flush()
total += len(chunk)
return totalpython
async def bulk_insert_users(db: AsyncSession, users_data: list[dict]) -> int:
"""Efficient bulk insert - SQLAlchemy 2.0 uses multi-value INSERT."""
# SQLAlchemy 2.0 automatically batches as single INSERT with multiple VALUES
users = [User(**data) for data in users_data]
db.add_all(users)
await db.flush() # Get IDs without committing
return len(users)
async def bulk_insert_chunked(
db: AsyncSession,
items: list[dict],
chunk_size: int = 1000,
) -> int:
"""Insert large datasets in chunks to manage memory."""
total = 0
for i in range(0, len(items), chunk_size):
chunk = items[i:i + chunk_size]
db.add_all([Item(**data) for data in chunk])
await db.flush()
total += len(chunk)
return totalRepository Pattern
Repository模式
python
from typing import Generic, TypeVar
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T", bound=Base)
class AsyncRepository(Generic[T]):
"""Generic async repository for CRUD operations."""
def __init__(self, session: AsyncSession, model: type[T]):
self.session = session
self.model = model
async def get(self, id: UUID) -> T | None:
return await self.session.get(self.model, id)
async def get_many(self, ids: list[UUID]) -> list[T]:
result = await self.session.execute(
select(self.model).where(self.model.id.in_(ids))
)
return list(result.scalars().all())
async def create(self, **kwargs) -> T:
instance = self.model(**kwargs)
self.session.add(instance)
await self.session.flush()
return instance
async def update(self, instance: T, **kwargs) -> T:
for key, value in kwargs.items():
setattr(instance, key, value)
await self.session.flush()
return instance
async def delete(self, instance: T) -> None:
await self.session.delete(instance)
await self.session.flush()python
from typing import Generic, TypeVar
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
T = TypeVar("T", bound=Base)
class AsyncRepository(Generic[T]):
"""Generic async repository for CRUD operations."""
def __init__(self, session: AsyncSession, model: type[T]):
self.session = session
self.model = model
async def get(self, id: UUID) -> T | None:
return await self.session.get(self.model, id)
async def get_many(self, ids: list[UUID]) -> list[T]:
result = await self.session.execute(
select(self.model).where(self.model.id.in_(ids))
)
return list(result.scalars().all())
async def create(self, **kwargs) -> T:
instance = self.model(**kwargs)
self.session.add(instance)
await self.session.flush()
return instance
async def update(self, instance: T, **kwargs) -> T:
for key, value in kwargs.items():
setattr(instance, key, value)
await self.session.flush()
return instance
async def delete(self, instance: T) -> None:
await self.session.delete(instance)
await self.session.flush()Concurrent Queries with TaskGroup
基于TaskGroup的并发查询
python
import asyncio
async def get_dashboard_data(db: AsyncSession, user_id: UUID) -> dict:
"""Run multiple queries concurrently - same session is NOT thread-safe."""
# WRONG: Don't share AsyncSession across tasks
# async with asyncio.TaskGroup() as tg:
# tg.create_task(db.execute(...)) # NOT SAFE
# CORRECT: Sequential queries with same session
user = await db.get(User, user_id)
orders_result = await db.execute(
select(Order).where(Order.user_id == user_id).limit(10)
)
stats_result = await db.execute(
select(func.count(Order.id)).where(Order.user_id == user_id)
)
return {
"user": user,
"recent_orders": list(orders_result.scalars().all()),
"total_orders": stats_result.scalar(),
}
async def get_data_from_multiple_users(user_ids: list[UUID]) -> list[dict]:
"""Concurrent queries - each task gets its own session."""
async def fetch_user(user_id: UUID) -> dict:
async with async_session_factory() as session:
user = await session.get(User, user_id)
return {"id": user_id, "email": user.email if user else None}
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]
return [t.result() for t in tasks]python
import asyncio
async def get_dashboard_data(db: AsyncSession, user_id: UUID) -> dict:
"""Run multiple queries concurrently - same session is NOT thread-safe."""
# WRONG: Don't share AsyncSession across tasks
# async with asyncio.TaskGroup() as tg:
# tg.create_task(db.execute(...)) # NOT SAFE
# CORRECT: Sequential queries with same session
user = await db.get(User, user_id)
orders_result = await db.execute(
select(Order).where(Order.user_id == user_id).limit(10)
)
stats_result = await db.execute(
select(func.count(Order.id)).where(Order.user_id == user_id)
)
return {
"user": user,
"recent_orders": list(orders_result.scalars().all()),
"total_orders": stats_result.scalar(),
}
async def get_data_from_multiple_users(user_ids: list[UUID]) -> list[dict]:
"""Concurrent queries - each task gets its own session."""
async def fetch_user(user_id: UUID) -> dict:
async with async_session_factory() as session:
user = await session.get(User, user_id)
return {"id": user_id, "email": user.email if user else None}
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]
return [t.result() for t in tasks]Key Decisions
关键决策
| Decision | Recommendation | Rationale |
|---|---|---|
| Session scope | One AsyncSession per task/request | SQLAlchemy docs: "AsyncSession per task" |
| Scoped sessions | Avoid for async | Maintainers discourage for async code |
| Lazy loading | Use | Prevents accidental N+1 in async |
| Eager loading | | Better than joinedload for async |
| expire_on_commit | Set to | Prevents lazy load errors after commit |
| Connection pool | | Validates connections before use |
| Bulk inserts | Chunk 1000-10000 rows | Memory management for large inserts |
| 决策 | 推荐方案 | 理由 |
|---|---|---|
| 会话作用域 | 每个任务/请求对应一个AsyncSession | SQLAlchemy文档:“每个任务对应一个AsyncSession” |
| 作用域会话 | 异步场景下请避免使用 | 维护者不建议在异步代码中使用 |
| 延迟加载 | 使用 | 防止异步环境中意外出现N+1查询 |
| 预加载 | 集合类型使用 | 在异步场景下表现优于joinedload |
| commit后过期设置 | 设置为 | 避免commit后出现延迟加载错误 |
| 连接池 | 设置 | 在使用前验证连接有效性 |
| 批量插入 | 按1000-10000行分块 | 对大数量插入进行内存管理 |
Anti-Patterns (FORBIDDEN)
反模式(禁止使用)
python
undefinedpython
undefinedNEVER share AsyncSession across tasks
NEVER share AsyncSession across tasks
async with asyncio.TaskGroup() as tg:
tg.create_task(session.execute(...)) # RACE CONDITION
async with asyncio.TaskGroup() as tg:
tg.create_task(session.execute(...)) # RACE CONDITION
NEVER use sync Session in async code
NEVER use sync Session in async code
from sqlalchemy.orm import Session
session = Session(engine) # BLOCKS EVENT LOOP
from sqlalchemy.orm import Session
session = Session(engine) # BLOCKS EVENT LOOP
NEVER access lazy-loaded relationships without eager loading
NEVER access lazy-loaded relationships without eager loading
user = await session.get(User)
orders = user.orders # RAISES if lazy="raise", or BLOCKS if not
user = await session.get(User)
orders = user.orders # RAISES if lazy="raise", or BLOCKS if not
NEVER use scoped_session with async
NEVER use scoped_session with async
from sqlalchemy.orm import scoped_session
ScopedSession = scoped_session(session_factory) # WRONG for async
from sqlalchemy.orm import scoped_session
ScopedSession = scoped_session(session_factory) # WRONG for async
NEVER forget to handle session lifecycle
NEVER forget to handle session lifecycle
session = async_session_factory()
result = await session.execute(...)
session = async_session_factory()
result = await session.execute(...)
MISSING: session.close() - connection leak!
MISSING: session.close() - connection leak!
NEVER use create_async_engine without pool_pre_ping in production
NEVER use create_async_engine without pool_pre_ping in production
engine = create_async_engine(url) # May use stale connections
undefinedengine = create_async_engine(url) # May use stale connections
undefinedRelated Skills
相关技能
- - TaskGroup and structured concurrency patterns
asyncio-advanced - - Database migration with async support
alembic-migrations - - Full FastAPI integration patterns
fastapi-advanced - - Schema design best practices
database-schema-designer
- - TaskGroup与结构化并发模式
asyncio-advanced - - 支持异步的数据库迁移
alembic-migrations - - 完整的FastAPI集成模式
fastapi-advanced - - 数据库Schema设计最佳实践
database-schema-designer
Capability Details
能力详情
async-session
async-session
Keywords: AsyncSession, async_sessionmaker, session factory, connection
Solves:
- How do I create async database sessions?
- Configure async connection pooling
- Session lifecycle management
关键词: AsyncSession, async_sessionmaker, 会话工厂, 连接
解决的问题:
- 如何创建异步数据库会话?
- 配置异步连接池
- 会话生命周期管理
fastapi-integration
fastapi-integration
Keywords: Depends, dependency injection, get_db, request scope
Solves:
- How do I integrate SQLAlchemy with FastAPI?
- Request-scoped database sessions
- Automatic commit/rollback handling
关键词: Depends, 依赖注入, get_db, 请求作用域
解决的问题:
- 如何将SQLAlchemy与FastAPI集成?
- 请求作用域的数据库会话
- 自动提交/回滚处理
eager-loading
eager-loading
Keywords: selectinload, joinedload, eager load, N+1, relationship
Solves:
- How do I avoid N+1 queries in async?
- Load relationships efficiently
- Configure lazy loading behavior
关键词: selectinload, joinedload, 预加载, N+1, 关联关系
解决的问题:
- 如何在异步环境中避免N+1查询?
- 高效加载关联关系
- 配置延迟加载行为
bulk-operations
bulk-operations
Keywords: bulk insert, batch, chunk, add_all, performance
Solves:
- How do I insert many rows efficiently?
- Chunk large inserts for memory
- SQLAlchemy 2.0 bulk optimizations
关键词: 批量插入, 批处理, 分块, add_all, 性能
解决的问题:
- 如何高效插入大量数据行?
- 对大数量插入进行分块以优化内存
- SQLAlchemy 2.0 批量操作优化
repository-pattern
repository-pattern
Keywords: repository, CRUD, generic, base repository
Solves:
- How do I implement repository pattern?
- Generic async CRUD operations
- Clean architecture with SQLAlchemy
关键词: repository, CRUD, 泛型, 基础repository
解决的问题:
- 如何实现Repository模式?
- 泛型异步CRUD操作
- 结合SQLAlchemy的整洁架构