alembic-migrations

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Alembic Migration Patterns ()

Alembic Migration Patterns ()

Database migration management with Alembic for SQLAlchemy 2.0 async applications.
针对SQLAlchemy 2.0异步应用的数据库迁移管理方案。

Overview

概述

  • Creating or modifying database tables and columns
  • Auto-generating migrations from SQLAlchemy models
  • Implementing zero-downtime schema changes
  • Rolling back or managing migration history
  • Adding indexes on large production tables
  • Setting up Alembic with async PostgreSQL (asyncpg)
  • 创建或修改数据库表与列
  • 从SQLAlchemy模型自动生成迁移脚本
  • 实现零停机的schema变更
  • 回滚或管理迁移历史
  • 在大型生产表上添加索引
  • 配置Alembic与异步PostgreSQL(asyncpg)

Quick Reference

快速参考

Initialize Alembic (Async Template)

初始化Alembic(异步模板)

bash
undefined
bash
undefined

Initialize with async template for asyncpg

使用asyncpg异步模板初始化

alembic init -t async migrations
alembic init -t async migrations

Creates:

生成以下文件:

- alembic.ini

- alembic.ini

- migrations/env.py (async-ready)

- migrations/env.py(支持异步)

- migrations/script.py.mako

- migrations/script.py.mako

- migrations/versions/

- migrations/versions/

undefined
undefined

Async env.py Configuration

异步env.py配置

python
undefined
python
undefined

migrations/env.py

migrations/env.py

import asyncio from logging.config import fileConfig
from sqlalchemy import pool from sqlalchemy.engine import Connection from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
import asyncio from logging.config import fileConfig
from sqlalchemy import pool from sqlalchemy.engine import Connection from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context

Import your models' Base for autogenerate

导入模型的Base类以支持自动生成

from app.models.base import Base
config = context.config if config.config_file_name is not None: fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None: """Run migrations in 'offline' mode - generates SQL.""" url = config.get_main_option("sqlalchemy.url") context.configure( url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, ) with context.begin_transaction(): context.run_migrations()
def do_run_migrations(connection: Connection) -> None: context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations()
async def run_async_migrations() -> None: """Run migrations in 'online' mode with async engine.""" connectable = async_engine_from_config( config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, )
async with connectable.connect() as connection:
    await connection.run_sync(do_run_migrations)

await connectable.dispose()
def run_migrations_online() -> None: """Entry point for online migrations.""" asyncio.run(run_async_migrations())
if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()
undefined
from app.models.base import Base
config = context.config if config.config_file_name is not None: fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None: """在'离线'模式下运行迁移 - 生成SQL脚本。""" url = config.get_main_option("sqlalchemy.url") context.configure( url=url, target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, ) with context.begin_transaction(): context.run_migrations()
def do_run_migrations(connection: Connection) -> None: context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations()
async def run_async_migrations() -> None: """使用异步引擎在'在线'模式下运行迁移。""" connectable = async_engine_from_config( config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, )
async with connectable.connect() as connection:
    await connection.run_sync(do_run_migrations)

await connectable.dispose()
def run_migrations_online() -> None: """在线迁移的入口函数。""" asyncio.run(run_async_migrations())
if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()
undefined

Migration Template

迁移脚本模板

python
"""Add users table.

Revision ID: abc123
Revises: None
Create Date: -01-17 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID

revision = 'abc123'
down_revision = None
branch_labels = None
depends_on = None

def upgrade() -> None:
    op.create_table(
        'users',
        sa.Column('id', UUID(as_uuid=True), primary_key=True),
        sa.Column('email', sa.String(255), nullable=False),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
    )
    op.create_index('idx_users_email', 'users', ['email'], unique=True)

def downgrade() -> None:
    op.drop_index('idx_users_email', table_name='users')
    op.drop_table('users')
python
"""添加users表。

修订ID: abc123
基于版本: None
创建时间: -01-17 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID

revision = 'abc123'
down_revision = None
branch_labels = None
depends_on = None

def upgrade() -> None:
    op.create_table(
        'users',
        sa.Column('id', UUID(as_uuid=True), primary_key=True),
        sa.Column('email', sa.String(255), nullable=False),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
    )
    op.create_index('idx_users_email', 'users', ['email'], unique=True)

def downgrade() -> None:
    op.drop_index('idx_users_email', table_name='users')
    op.drop_table('users')

Autogenerate Migration

自动生成迁移脚本

bash
undefined
bash
undefined

Generate from model changes

根据模型变更生成迁移脚本

alembic revision --autogenerate -m "add user preferences"
alembic revision --autogenerate -m "add user preferences"

Apply migrations

应用迁移

alembic upgrade head
alembic upgrade head

Rollback one step

回滚一个版本

alembic downgrade -1
alembic downgrade -1

Generate SQL for review (production)

生成SQL脚本用于审核(生产环境)

alembic upgrade head --sql > migration.sql
alembic upgrade head --sql > migration.sql

Check current revision

查看当前版本

alembic current
alembic current

Show migration history

查看迁移历史(详情)

alembic history --verbose
undefined
alembic history --verbose
undefined

Running Async Code in Migrations

在迁移中运行异步代码

python
"""Migration with async operation.

NOTE: Alembic upgrade/downgrade cannot be async, but you can
run async code using sqlalchemy.util.await_only workaround.
"""
from alembic import op
from sqlalchemy import text
from sqlalchemy.util import await_only

def upgrade() -> None:
    # Get connection (works with async dialect)
    connection = op.get_bind()

    # For async-only operations, use await_only
    # This works because Alembic runs in greenlet context
    result = await_only(
        connection.execute(text("SELECT count(*) FROM users"))
    )

    # Standard operations work normally with async engine
    op.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org
        ON users (organization_id, created_at DESC)
    """)
python
"""包含异步操作的迁移脚本。

注意:Alembic的upgrade/downgrade函数不能是异步的,但可以
使用sqlalchemy.util.await_only作为变通方案来运行异步代码。
"""
from alembic import op
from sqlalchemy import text
from sqlalchemy.util import await_only

def upgrade() -> None:
    # 获取连接(兼容异步方言)
    connection = op.get_bind()

    # 对于仅支持异步的操作,使用await_only
    # 这是可行的,因为Alembic运行在greenlet上下文环境中
    result = await_only(
        connection.execute(text("SELECT count(*) FROM users"))
    )

    # 标准操作在异步引擎下可正常执行
    op.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org
        ON users (organization_id, created_at DESC)
    """)

Concurrent Index (Zero-Downtime)

并发索引(零停机)

python
def upgrade() -> None:
    # CONCURRENTLY avoids table locks on large tables
    # IMPORTANT: Cannot run inside transaction block
    op.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org
        ON users (organization_id, created_at DESC)
    """)

def downgrade() -> None:
    op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_org")
python
def upgrade() -> None:
    # CONCURRENTLY避免对大表加锁
    # 重要:不能在事务块中运行
    op.execute("""
        CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org
        ON users (organization_id, created_at DESC)
    """)

def downgrade() -> None:
    op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_org")

In alembic.ini or env.py, disable transaction for this migration:

在alembic.ini或env.py中,为此迁移禁用事务:

Set transaction_per_migration = false for CONCURRENTLY operations

对于CONCURRENTLY操作,设置transaction_per_migration = false

undefined
undefined

Two-Phase NOT NULL Migration

分阶段添加NOT NULL列

python
"""Add org_id column (phase 1 - nullable).

Phase 1: Add nullable column
Phase 2: Backfill data
Phase 3: Add NOT NULL (separate migration after verification)
"""

def upgrade() -> None:
    # Phase 1: Add as nullable first
    op.add_column('users', sa.Column('org_id', UUID(as_uuid=True), nullable=True))

    # Phase 2: Backfill with default org
    op.execute("""
        UPDATE users
        SET org_id = 'default-org-uuid'
        WHERE org_id IS NULL
    """)

    # Phase 3 in SEPARATE migration after app updated:
    # op.alter_column('users', 'org_id', nullable=False)

def downgrade() -> None:
    op.drop_column('users', 'org_id')
python
"""添加org_id列(第一阶段 - 允许为空)。

阶段1:添加允许为空的列
阶段2:回填数据
阶段3:设置NOT NULL(验证完成后在单独的迁移中执行)
"""

def upgrade() -> None:
    # 阶段1:先添加允许为空的列
    op.add_column('users', sa.Column('org_id', UUID(as_uuid=True), nullable=True))

    # 阶段2:用默认组织ID回填数据
    op.execute("""
        UPDATE users
        SET org_id = 'default-org-uuid'
        WHERE org_id IS NULL
    """)

    # 阶段3在应用更新完成后的单独迁移中执行:
    # op.alter_column('users', 'org_id', nullable=False)

def downgrade() -> None:
    op.drop_column('users', 'org_id')

Key Decisions

关键决策

DecisionRecommendationRationale
Async dialectUse
postgresql+asyncpg
Native async support
NOT NULL columnTwo-phase: nullable first, then alterAvoids locking, backward compatible
Large table index
CREATE INDEX CONCURRENTLY
Zero-downtime, no table locks
Column rename4-phase expand/contractSafe migration without downtime
Autogenerate reviewAlways review generated SQLMay miss custom constraints
Migration granularityOne logical change per fileEasier rollback and debugging
Production deploymentGenerate SQL, review, then applyNever auto-run in production
Downgrade functionAlways implement properlyEnsures reversibility
Transaction modeDefault on, disable for CONCURRENTLYCONCURRENTLY requires no transaction
决策项推荐方案理由
异步方言使用
postgresql+asyncpg
原生异步支持
NOT NULL列分两阶段:先允许为空,再修改约束避免加锁,向后兼容
大表索引
CREATE INDEX CONCURRENTLY
零停机,无表锁
列重命名四阶段扩展/收缩模式安全迁移无停机
自动生成脚本审核始终审核生成的SQL可能遗漏自定义约束
迁移粒度每个文件对应一个逻辑变更便于回滚和调试
生产环境部署生成SQL、审核后再应用绝不在生产环境自动运行
回滚函数始终正确实现确保可回滚性
事务模式默认开启,CONCURRENTLY操作时禁用CONCURRENTLY要求无事务

Anti-Patterns (FORBIDDEN)

反模式(禁止)

python
undefined
python
undefined

NEVER: Add NOT NULL without default or two-phase approach

绝对禁止:不设置默认值或不采用分阶段方式直接添加NOT NULL列

op.add_column('users', sa.Column('org_id', UUID, nullable=False)) # LOCKS TABLE, FAILS!
op.add_column('users', sa.Column('org_id', UUID, nullable=False)) # 会锁表,执行失败!

NEVER: Use blocking index creation on large tables

绝对禁止:在大表上使用阻塞式索引创建

op.create_index('idx_large', 'big_table', ['col']) # LOCKS TABLE - use CONCURRENTLY
op.create_index('idx_large', 'big_table', ['col']) # 会锁表 - 请使用CONCURRENTLY

NEVER: Skip downgrade implementation

绝对禁止:跳过回滚函数的实现

def downgrade(): pass # WRONG - implement proper rollback
def downgrade(): pass # 错误 - 必须实现正确的回滚逻辑

NEVER: Modify migration after deployment

绝对禁止:部署后修改迁移脚本

Create a new migration instead!

应创建新的迁移脚本!

NEVER: Run migrations automatically in production

绝对禁止:在生产环境自动运行迁移

Use: alembic upgrade head --sql > review.sql

正确做法:alembic upgrade head --sql > review.sql

NEVER: Use asyncio.run() in env.py if loop exists

绝对禁止:如果事件循环已存在,在env.py中使用asyncio.run()

Already handled by async template, but check for FastAPI lifespan conflicts

异步模板已处理此问题,但需注意FastAPI lifespan的冲突

NEVER: Run CONCURRENTLY inside transaction

绝对禁止:在事务中运行CONCURRENTLY

op.execute("BEGIN; CREATE INDEX CONCURRENTLY ...; COMMIT;") # FAILS
undefined
op.execute("BEGIN; CREATE INDEX CONCURRENTLY ...; COMMIT;") # 执行失败
undefined

Alembic with FastAPI Lifespan

Alembic与FastAPI生命周期结合

python
undefined
python
undefined

When running migrations during FastAPI startup (advanced)

在FastAPI启动时运行迁移(进阶用法)

Issue: Event loop already running

问题:事件循环已在运行

Solution 1: Run migrations before app starts (recommended)

方案1:在应用启动前运行迁移(推荐)

In entrypoint.sh:

在entrypoint.sh中:

alembic upgrade head && uvicorn app.main:app

alembic upgrade head && uvicorn app.main:app

Solution 2: Use run_sync for programmatic migrations

方案2:使用run_sync以编程方式运行迁移

from sqlalchemy import Connection from alembic import command from alembic.config import Config
async def run_migrations(connection: Connection) -> None: """Run migrations programmatically within existing async context.""" def do_upgrade(connection: Connection): config = Config("alembic.ini") config.attributes["connection"] = connection command.upgrade(config, "head")
await connection.run_sync(do_upgrade)
undefined
from sqlalchemy import Connection from alembic import command from alembic.config import Config
async def run_migrations(connection: Connection) -> None: """在现有异步上下文中以编程方式运行迁移。""" def do_upgrade(connection: Connection): config = Config("alembic.ini") config.attributes["connection"] = connection command.upgrade(config, "head")
await connection.run_sync(do_upgrade)
undefined

Related Skills

相关技能

  • database-schema-designer
    - Schema design and normalization patterns
  • database-versioning
    - Version control and change management
  • zero-downtime-migration
    - Expand/contract patterns for safe migrations
  • sqlalchemy-2-async
    - Async SQLAlchemy session patterns
  • integration-testing
    - Testing migrations with test databases
  • database-schema-designer
    - 数据库Schema设计与规范化模式
  • database-versioning
    - 版本控制与变更管理
  • zero-downtime-migration
    - 用于安全迁移的扩展/收缩模式
  • sqlalchemy-2-async
    - SQLAlchemy异步会话模式
  • integration-testing
    - 使用测试数据库测试迁移

Capability Details

能力详情

autogenerate-migrations

autogenerate-migrations

Keywords: autogenerate, auto-generate, revision, model sync, compare Solves:
  • Auto-generate migrations from SQLAlchemy models
  • Sync database with model changes
  • Detect schema drift
关键词: autogenerate, auto-generate, revision, model sync, compare 解决问题:
  • 从SQLAlchemy模型自动生成迁移脚本
  • 同步数据库与模型变更
  • 检测Schema漂移

revision-management

revision-management

Keywords: upgrade, downgrade, rollback, history, current, revision Solves:
  • Apply or rollback migrations
  • View migration history
  • Check current database version
关键词: upgrade, downgrade, rollback, history, current, revision 解决问题:
  • 应用或回滚迁移
  • 查看迁移历史
  • 检查当前数据库版本

zero-downtime-changes

zero-downtime-changes

Keywords: concurrent, expand contract, online migration, no downtime Solves:
  • Add indexes without locking
  • Rename columns safely
  • Large table migrations
关键词: concurrent, expand contract, online migration, no downtime 解决问题:
  • 无锁添加索引
  • 安全重命名列
  • 大表迁移

data-migration

data-migration

Keywords: backfill, data migration, transform, batch update Solves:
  • Backfill new columns with data
  • Transform existing data
  • Migrate between column formats
关键词: backfill, data migration, transform, batch update 解决问题:
  • 用数据回填新列
  • 转换现有数据
  • 在不同列格式间迁移

async-configuration

async-configuration

Keywords: asyncpg, async engine, env.py async, run_async_migrations Solves:
  • Configure Alembic for async SQLAlchemy
  • Run migrations with asyncpg
  • Handle existing event loop conflicts
关键词: asyncpg, async engine, env.py async, run_async_migrations 解决问题:
  • 为异步SQLAlchemy配置Alembic
  • 使用asyncpg运行迁移
  • 处理现有事件循环冲突