alembic-migrations
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAlembic Migration Patterns ()
Alembic Migration Patterns ()
Database migration management with Alembic for SQLAlchemy 2.0 async applications.
针对SQLAlchemy 2.0异步应用的数据库迁移管理方案。
Overview
概述
- Creating or modifying database tables and columns
- Auto-generating migrations from SQLAlchemy models
- Implementing zero-downtime schema changes
- Rolling back or managing migration history
- Adding indexes on large production tables
- Setting up Alembic with async PostgreSQL (asyncpg)
- 创建或修改数据库表与列
- 从SQLAlchemy模型自动生成迁移脚本
- 实现零停机的schema变更
- 回滚或管理迁移历史
- 在大型生产表上添加索引
- 配置Alembic与异步PostgreSQL(asyncpg)
Quick Reference
快速参考
Initialize Alembic (Async Template)
初始化Alembic(异步模板)
bash
undefinedbash
undefinedInitialize with async template for asyncpg
使用asyncpg异步模板初始化
alembic init -t async migrations
alembic init -t async migrations
Creates:
生成以下文件:
- alembic.ini
- alembic.ini
- migrations/env.py (async-ready)
- migrations/env.py(支持异步)
- migrations/script.py.mako
- migrations/script.py.mako
- migrations/versions/
- migrations/versions/
undefinedundefinedAsync env.py Configuration
异步env.py配置
python
undefinedpython
undefinedmigrations/env.py
migrations/env.py
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
Import your models' Base for autogenerate
导入模型的Base类以支持自动生成
from app.models.base import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode - generates SQL."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in 'online' mode with async engine."""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()def run_migrations_online() -> None:
"""Entry point for online migrations."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
undefinedfrom app.models.base import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""在'离线'模式下运行迁移 - 生成SQL脚本。"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""使用异步引擎在'在线'模式下运行迁移。"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()def run_migrations_online() -> None:
"""在线迁移的入口函数。"""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
undefinedMigration Template
迁移脚本模板
python
"""Add users table.
Revision ID: abc123
Revises: None
Create Date: -01-17 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
revision = 'abc123'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
'users',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index('idx_users_email', 'users', ['email'], unique=True)
def downgrade() -> None:
op.drop_index('idx_users_email', table_name='users')
op.drop_table('users')python
"""添加users表。
修订ID: abc123
基于版本: None
创建时间: -01-17 10:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
revision = 'abc123'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
'users',
sa.Column('id', UUID(as_uuid=True), primary_key=True),
sa.Column('email', sa.String(255), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index('idx_users_email', 'users', ['email'], unique=True)
def downgrade() -> None:
op.drop_index('idx_users_email', table_name='users')
op.drop_table('users')Autogenerate Migration
自动生成迁移脚本
bash
undefinedbash
undefinedGenerate from model changes
根据模型变更生成迁移脚本
alembic revision --autogenerate -m "add user preferences"
alembic revision --autogenerate -m "add user preferences"
Apply migrations
应用迁移
alembic upgrade head
alembic upgrade head
Rollback one step
回滚一个版本
alembic downgrade -1
alembic downgrade -1
Generate SQL for review (production)
生成SQL脚本用于审核(生产环境)
alembic upgrade head --sql > migration.sql
alembic upgrade head --sql > migration.sql
Check current revision
查看当前版本
alembic current
alembic current
Show migration history
查看迁移历史(详情)
alembic history --verbose
undefinedalembic history --verbose
undefinedRunning Async Code in Migrations
在迁移中运行异步代码
python
"""Migration with async operation.
NOTE: Alembic upgrade/downgrade cannot be async, but you can
run async code using sqlalchemy.util.await_only workaround.
"""
from alembic import op
from sqlalchemy import text
from sqlalchemy.util import await_only
def upgrade() -> None:
# Get connection (works with async dialect)
connection = op.get_bind()
# For async-only operations, use await_only
# This works because Alembic runs in greenlet context
result = await_only(
connection.execute(text("SELECT count(*) FROM users"))
)
# Standard operations work normally with async engine
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org
ON users (organization_id, created_at DESC)
""")python
"""包含异步操作的迁移脚本。
注意:Alembic的upgrade/downgrade函数不能是异步的,但可以
使用sqlalchemy.util.await_only作为变通方案来运行异步代码。
"""
from alembic import op
from sqlalchemy import text
from sqlalchemy.util import await_only
def upgrade() -> None:
# 获取连接(兼容异步方言)
connection = op.get_bind()
# 对于仅支持异步的操作,使用await_only
# 这是可行的,因为Alembic运行在greenlet上下文环境中
result = await_only(
connection.execute(text("SELECT count(*) FROM users"))
)
# 标准操作在异步引擎下可正常执行
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org
ON users (organization_id, created_at DESC)
""")Concurrent Index (Zero-Downtime)
并发索引(零停机)
python
def upgrade() -> None:
# CONCURRENTLY avoids table locks on large tables
# IMPORTANT: Cannot run inside transaction block
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org
ON users (organization_id, created_at DESC)
""")
def downgrade() -> None:
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_org")python
def upgrade() -> None:
# CONCURRENTLY避免对大表加锁
# 重要:不能在事务块中运行
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_users_org
ON users (organization_id, created_at DESC)
""")
def downgrade() -> None:
op.execute("DROP INDEX CONCURRENTLY IF EXISTS idx_users_org")In alembic.ini or env.py, disable transaction for this migration:
在alembic.ini或env.py中,为此迁移禁用事务:
Set transaction_per_migration = false for CONCURRENTLY operations
对于CONCURRENTLY操作,设置transaction_per_migration = false
undefinedundefinedTwo-Phase NOT NULL Migration
分阶段添加NOT NULL列
python
"""Add org_id column (phase 1 - nullable).
Phase 1: Add nullable column
Phase 2: Backfill data
Phase 3: Add NOT NULL (separate migration after verification)
"""
def upgrade() -> None:
# Phase 1: Add as nullable first
op.add_column('users', sa.Column('org_id', UUID(as_uuid=True), nullable=True))
# Phase 2: Backfill with default org
op.execute("""
UPDATE users
SET org_id = 'default-org-uuid'
WHERE org_id IS NULL
""")
# Phase 3 in SEPARATE migration after app updated:
# op.alter_column('users', 'org_id', nullable=False)
def downgrade() -> None:
op.drop_column('users', 'org_id')python
"""添加org_id列(第一阶段 - 允许为空)。
阶段1:添加允许为空的列
阶段2:回填数据
阶段3:设置NOT NULL(验证完成后在单独的迁移中执行)
"""
def upgrade() -> None:
# 阶段1:先添加允许为空的列
op.add_column('users', sa.Column('org_id', UUID(as_uuid=True), nullable=True))
# 阶段2:用默认组织ID回填数据
op.execute("""
UPDATE users
SET org_id = 'default-org-uuid'
WHERE org_id IS NULL
""")
# 阶段3在应用更新完成后的单独迁移中执行:
# op.alter_column('users', 'org_id', nullable=False)
def downgrade() -> None:
op.drop_column('users', 'org_id')Key Decisions
关键决策
| Decision | Recommendation | Rationale |
|---|---|---|
| Async dialect | Use | Native async support |
| NOT NULL column | Two-phase: nullable first, then alter | Avoids locking, backward compatible |
| Large table index | | Zero-downtime, no table locks |
| Column rename | 4-phase expand/contract | Safe migration without downtime |
| Autogenerate review | Always review generated SQL | May miss custom constraints |
| Migration granularity | One logical change per file | Easier rollback and debugging |
| Production deployment | Generate SQL, review, then apply | Never auto-run in production |
| Downgrade function | Always implement properly | Ensures reversibility |
| Transaction mode | Default on, disable for CONCURRENTLY | CONCURRENTLY requires no transaction |
| 决策项 | 推荐方案 | 理由 |
|---|---|---|
| 异步方言 | 使用 | 原生异步支持 |
| NOT NULL列 | 分两阶段:先允许为空,再修改约束 | 避免加锁,向后兼容 |
| 大表索引 | | 零停机,无表锁 |
| 列重命名 | 四阶段扩展/收缩模式 | 安全迁移无停机 |
| 自动生成脚本审核 | 始终审核生成的SQL | 可能遗漏自定义约束 |
| 迁移粒度 | 每个文件对应一个逻辑变更 | 便于回滚和调试 |
| 生产环境部署 | 生成SQL、审核后再应用 | 绝不在生产环境自动运行 |
| 回滚函数 | 始终正确实现 | 确保可回滚性 |
| 事务模式 | 默认开启,CONCURRENTLY操作时禁用 | CONCURRENTLY要求无事务 |
Anti-Patterns (FORBIDDEN)
反模式(禁止)
python
undefinedpython
undefinedNEVER: Add NOT NULL without default or two-phase approach
绝对禁止:不设置默认值或不采用分阶段方式直接添加NOT NULL列
op.add_column('users', sa.Column('org_id', UUID, nullable=False)) # LOCKS TABLE, FAILS!
op.add_column('users', sa.Column('org_id', UUID, nullable=False)) # 会锁表,执行失败!
NEVER: Use blocking index creation on large tables
绝对禁止:在大表上使用阻塞式索引创建
op.create_index('idx_large', 'big_table', ['col']) # LOCKS TABLE - use CONCURRENTLY
op.create_index('idx_large', 'big_table', ['col']) # 会锁表 - 请使用CONCURRENTLY
NEVER: Skip downgrade implementation
绝对禁止:跳过回滚函数的实现
def downgrade():
pass # WRONG - implement proper rollback
def downgrade():
pass # 错误 - 必须实现正确的回滚逻辑
NEVER: Modify migration after deployment
绝对禁止:部署后修改迁移脚本
Create a new migration instead!
应创建新的迁移脚本!
NEVER: Run migrations automatically in production
绝对禁止:在生产环境自动运行迁移
Use: alembic upgrade head --sql > review.sql
正确做法:alembic upgrade head --sql > review.sql
NEVER: Use asyncio.run() in env.py if loop exists
绝对禁止:如果事件循环已存在,在env.py中使用asyncio.run()
Already handled by async template, but check for FastAPI lifespan conflicts
异步模板已处理此问题,但需注意FastAPI lifespan的冲突
NEVER: Run CONCURRENTLY inside transaction
绝对禁止:在事务中运行CONCURRENTLY
op.execute("BEGIN; CREATE INDEX CONCURRENTLY ...; COMMIT;") # FAILS
undefinedop.execute("BEGIN; CREATE INDEX CONCURRENTLY ...; COMMIT;") # 执行失败
undefinedAlembic with FastAPI Lifespan
Alembic与FastAPI生命周期结合
python
undefinedpython
undefinedWhen running migrations during FastAPI startup (advanced)
在FastAPI启动时运行迁移(进阶用法)
Issue: Event loop already running
问题:事件循环已在运行
Solution 1: Run migrations before app starts (recommended)
方案1:在应用启动前运行迁移(推荐)
In entrypoint.sh:
在entrypoint.sh中:
alembic upgrade head && uvicorn app.main:app
alembic upgrade head && uvicorn app.main:app
Solution 2: Use run_sync for programmatic migrations
方案2:使用run_sync以编程方式运行迁移
from sqlalchemy import Connection
from alembic import command
from alembic.config import Config
async def run_migrations(connection: Connection) -> None:
"""Run migrations programmatically within existing async context."""
def do_upgrade(connection: Connection):
config = Config("alembic.ini")
config.attributes["connection"] = connection
command.upgrade(config, "head")
await connection.run_sync(do_upgrade)undefinedfrom sqlalchemy import Connection
from alembic import command
from alembic.config import Config
async def run_migrations(connection: Connection) -> None:
"""在现有异步上下文中以编程方式运行迁移。"""
def do_upgrade(connection: Connection):
config = Config("alembic.ini")
config.attributes["connection"] = connection
command.upgrade(config, "head")
await connection.run_sync(do_upgrade)undefinedRelated Skills
相关技能
- - Schema design and normalization patterns
database-schema-designer - - Version control and change management
database-versioning - - Expand/contract patterns for safe migrations
zero-downtime-migration - - Async SQLAlchemy session patterns
sqlalchemy-2-async - - Testing migrations with test databases
integration-testing
- - 数据库Schema设计与规范化模式
database-schema-designer - - 版本控制与变更管理
database-versioning - - 用于安全迁移的扩展/收缩模式
zero-downtime-migration - - SQLAlchemy异步会话模式
sqlalchemy-2-async - - 使用测试数据库测试迁移
integration-testing
Capability Details
能力详情
autogenerate-migrations
autogenerate-migrations
Keywords: autogenerate, auto-generate, revision, model sync, compare
Solves:
- Auto-generate migrations from SQLAlchemy models
- Sync database with model changes
- Detect schema drift
关键词: autogenerate, auto-generate, revision, model sync, compare
解决问题:
- 从SQLAlchemy模型自动生成迁移脚本
- 同步数据库与模型变更
- 检测Schema漂移
revision-management
revision-management
Keywords: upgrade, downgrade, rollback, history, current, revision
Solves:
- Apply or rollback migrations
- View migration history
- Check current database version
关键词: upgrade, downgrade, rollback, history, current, revision
解决问题:
- 应用或回滚迁移
- 查看迁移历史
- 检查当前数据库版本
zero-downtime-changes
zero-downtime-changes
Keywords: concurrent, expand contract, online migration, no downtime
Solves:
- Add indexes without locking
- Rename columns safely
- Large table migrations
关键词: concurrent, expand contract, online migration, no downtime
解决问题:
- 无锁添加索引
- 安全重命名列
- 大表迁移
data-migration
data-migration
Keywords: backfill, data migration, transform, batch update
Solves:
- Backfill new columns with data
- Transform existing data
- Migrate between column formats
关键词: backfill, data migration, transform, batch update
解决问题:
- 用数据回填新列
- 转换现有数据
- 在不同列格式间迁移
async-configuration
async-configuration
Keywords: asyncpg, async engine, env.py async, run_async_migrations
Solves:
- Configure Alembic for async SQLAlchemy
- Run migrations with asyncpg
- Handle existing event loop conflicts
关键词: asyncpg, async engine, env.py async, run_async_migrations
解决问题:
- 为异步SQLAlchemy配置Alembic
- 使用asyncpg运行迁移
- 处理现有事件循环冲突