alembic

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Alembic Database Migration Management Skill

Alembic数据库迁移管理Skill

Overview

概述

This skill provides comprehensive guidance for managing database migrations using Alembic in customer support environments. It covers everything from initial setup through complex production deployment scenarios, with a focus on maintaining data integrity and minimizing downtime for support operations.
本Skill为客户支持环境中使用Alembic管理数据库迁移提供全面指导,涵盖从初始设置到复杂生产部署场景的所有内容,重点在于维护数据完整性并最小化支持业务的停机时间。

Core Concepts

核心概念

What is Alembic?

什么是Alembic?

Alembic is a lightweight database migration tool for use with SQLAlchemy. It provides a way to manage changes to your database schema over time through version-controlled migration scripts. For customer support systems, this means:
  • Version Control: Track all schema changes in your support database
  • Reproducibility: Apply the same migrations across dev, staging, and production
  • Rollback Capability: Safely revert problematic changes
  • Team Collaboration: Merge schema changes from multiple developers
  • Data Preservation: Migrate data during schema transformations
Alembic是一款搭配SQLAlchemy使用的轻量级数据库迁移工具。它通过版本控制的迁移脚本,提供了随时间管理数据库架构变更的方式。对于客户支持系统而言,这意味着:
  • 版本控制:追踪支持数据库中的所有架构变更
  • 可复现性:在开发、预发布和生产环境中应用相同的迁移
  • 回滚能力:安全回滚有问题的变更
  • 团队协作:合并来自多名开发者的架构变更
  • 数据保留:在架构转换过程中迁移数据

Migration Lifecycle in Support Systems

支持系统中的迁移生命周期

  1. Development: Create migrations locally while developing new features
  2. Testing: Validate migrations in staging environment
  3. Review: Code review migration scripts before production
  4. Deployment: Apply migrations to production with minimal downtime
  5. Monitoring: Track migration status and handle failures
  6. Rollback: Revert if issues arise in production
  1. 开发阶段:在开发新功能时本地创建迁移
  2. 测试阶段:在预发布环境中验证迁移
  3. 评审阶段:在生产部署前对迁移脚本进行代码评审
  4. 部署阶段:以最小停机时间将迁移应用到生产环境
  5. 监控阶段:追踪迁移状态并处理故障
  6. 回滚阶段:若生产环境出现问题则执行回滚

Installation and Initial Setup

安装与初始设置

Installing Alembic

安装Alembic

bash
undefined
bash
undefined

Install Alembic with PostgreSQL support

Install Alembic with PostgreSQL support

pip install alembic psycopg2-binary sqlalchemy
pip install alembic psycopg2-binary sqlalchemy

Or add to requirements.txt

Or add to requirements.txt

alembic>=1.13.0 sqlalchemy>=2.0.0 psycopg2-binary>=2.9.0
undefined
alembic>=1.13.0 sqlalchemy>=2.0.0 psycopg2-binary>=2.9.0
undefined

Initialize Alembic in Your Project

在项目中初始化Alembic

bash
undefined
bash
undefined

Initialize Alembic (creates alembic/ directory and alembic.ini)

Initialize Alembic (creates alembic/ directory and alembic.ini)

alembic init alembic
alembic init alembic

For multiple database support

For multiple database support

alembic init --template multidb alembic

This creates:
- `alembic/`: Directory containing migration scripts
- `alembic/versions/`: Where individual migration files live
- `alembic/env.py`: Migration environment configuration
- `alembic.ini`: Alembic configuration file
alembic init --template multidb alembic

此操作会创建:
- `alembic/`: 存放迁移脚本的目录
- `alembic/versions/`: 单个迁移文件的存储位置
- `alembic/env.py`: 迁移环境配置文件
- `alembic.ini`: Alembic配置文件

Configure Database Connection

配置数据库连接

Edit
alembic.ini
to set your database URL:
ini
undefined
编辑
alembic.ini
设置数据库URL:
ini
undefined

For development

For development

sqlalchemy.url = postgresql://user:password@localhost/support_dev
sqlalchemy.url = postgresql://user:password@localhost/support_dev

For production (use environment variables)

For production (use environment variables)

sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASSWORD)s@%(DB_HOST)s/%(DB_NAME)s

Better approach - use environment variables in `env.py`:

```python
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASSWORD)s@%(DB_HOST)s/%(DB_NAME)s

更优方案 - 在`env.py`中使用环境变量:

```python
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context

Import your models

Import your models

from myapp.models import Base
from myapp.models import Base

This is the Alembic Config object

This is the Alembic Config object

config = context.config
config = context.config

Override sqlalchemy.url from environment

Override sqlalchemy.url from environment

db_url = os.getenv('DATABASE_URL', 'postgresql://localhost/support_dev') config.set_main_option('sqlalchemy.url', db_url)
db_url = os.getenv('DATABASE_URL', 'postgresql://localhost/support_dev') config.set_main_option('sqlalchemy.url', db_url)

Set up target metadata for autogenerate

Set up target metadata for autogenerate

target_metadata = Base.metadata
undefined
target_metadata = Base.metadata
undefined

Creating Migrations

创建迁移

Manual Migration Creation

手动创建迁移

Create a migration manually when you need precise control:
bash
undefined
当需要精确控制时,手动创建迁移:
bash
undefined

Create empty migration file

Create empty migration file

alembic revision -m "add ticket priority column"

This generates a file like `versions/abc123_add_ticket_priority_column.py`:

```python
"""add ticket priority column

Revision ID: abc123
Revises: def456
Create Date: 2025-01-15 10:30:00.000000
"""

from alembic import op
import sqlalchemy as sa
alembic revision -m "add ticket priority column"

这会生成类似`versions/abc123_add_ticket_priority_column.py`的文件:

```python
"""add ticket priority column

Revision ID: abc123
Revises: def456
Create Date: 2025-01-15 10:30:00.000000
"""

from alembic import op
import sqlalchemy as sa

revision identifiers

revision identifiers

revision = 'abc123' down_revision = 'def456' branch_labels = None depends_on = None
def upgrade() -> None: # Add priority column to tickets table op.add_column('tickets', sa.Column('priority', sa.String(20), nullable=True, server_default='normal') )
# Create index for performance
op.create_index('ix_tickets_priority', 'tickets', ['priority'])
def downgrade() -> None: # Remove index first op.drop_index('ix_tickets_priority', 'tickets')
# Remove column
op.drop_column('tickets', 'priority')
undefined
revision = 'abc123' down_revision = 'def456' branch_labels = None depends_on = None
def upgrade() -> None: # Add priority column to tickets table op.add_column('tickets', sa.Column('priority', sa.String(20), nullable=True, server_default='normal') )
# Create index for performance
op.create_index('ix_tickets_priority', 'tickets', ['priority'])
def downgrade() -> None: # Remove index first op.drop_index('ix_tickets_priority', 'tickets')
# Remove column
op.drop_column('tickets', 'priority')
undefined

Autogenerate Migrations

自动生成迁移

Let Alembic detect schema changes automatically:
bash
undefined
让Alembic自动检测架构变更:
bash
undefined

Generate migration by comparing models to database

Generate migration by comparing models to database

alembic revision --autogenerate -m "add customer satisfaction table"

**Important**: Always review autogenerated migrations! They may miss:
- Renamed columns (appears as drop + add)
- Changed column types requiring data conversion
- Complex constraints
- Data migrations

Example autogenerated migration:

```python
"""add customer satisfaction table

Revision ID: xyz789
Revises: abc123
Create Date: 2025-01-15 11:00:00.000000
"""

from alembic import op
import sqlalchemy as sa

revision = 'xyz789'
down_revision = 'abc123'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Auto-generated - review before running!
    op.create_table(
        'customer_satisfaction',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('ticket_id', sa.Integer(), nullable=False),
        sa.Column('rating', sa.Integer(), nullable=False),
        sa.Column('feedback', sa.Text(), nullable=True),
        sa.Column('created_at', sa.DateTime(), nullable=False),
        sa.ForeignKeyConstraint(['ticket_id'], ['tickets.id'], ondelete='CASCADE'),
        sa.PrimaryKeyConstraint('id')
    )
    op.create_index('ix_satisfaction_ticket_id', 'customer_satisfaction', ['ticket_id'])
    op.create_index('ix_satisfaction_created_at', 'customer_satisfaction', ['created_at'])

def downgrade() -> None:
    op.drop_index('ix_satisfaction_created_at', 'customer_satisfaction')
    op.drop_index('ix_satisfaction_ticket_id', 'customer_satisfaction')
    op.drop_table('customer_satisfaction')
alembic revision --autogenerate -m "add customer satisfaction table"

**重要提示**:务必评审自动生成的迁移!它们可能会遗漏:
- 重命名的列(显示为删除+添加)
- 需要数据转换的列类型变更
- 复杂约束
- 数据迁移

自动生成的迁移示例:

```python
"""add customer satisfaction table

Revision ID: xyz789
Revises: abc123
Create Date: 2025-01-15 11:00:00.000000
"""

from alembic import op
import sqlalchemy as sa

revision = 'xyz789'
down_revision = 'abc123'
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Auto-generated - review before running!
    op.create_table(
        'customer_satisfaction',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('ticket_id', sa.Integer(), nullable=False),
        sa.Column('rating', sa.Integer(), nullable=False),
        sa.Column('feedback', sa.Text(), nullable=True),
        sa.Column('created_at', sa.DateTime(), nullable=False),
        sa.ForeignKeyConstraint(['ticket_id'], ['tickets.id'], ondelete='CASCADE'),
        sa.PrimaryKeyConstraint('id')
    )
    op.create_index('ix_satisfaction_ticket_id', 'customer_satisfaction', ['ticket_id'])
    op.create_index('ix_satisfaction_created_at', 'customer_satisfaction', ['created_at'])

def downgrade() -> None:
    op.drop_index('ix_satisfaction_created_at', 'customer_satisfaction')
    op.drop_index('ix_satisfaction_ticket_id', 'customer_satisfaction')
    op.drop_table('customer_satisfaction')

Data Migrations

数据迁移

Migrating Data During Schema Changes

架构变更期间迁移数据

When you need to transform existing data:
python
"""convert ticket status to new enum

Revision ID: data001
Revises: xyz789
Create Date: 2025-01-15 12:00:00.000000
"""

from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column

revision = 'data001'
down_revision = 'xyz789'

def upgrade() -> None:
    # Create new status column
    op.add_column('tickets',
        sa.Column('status_new', sa.String(50), nullable=True)
    )

    # Migrate data using bulk update
    tickets = table('tickets',
        column('status', sa.String),
        column('status_new', sa.String)
    )

    # Map old statuses to new ones
    status_mapping = {
        'open': 'OPEN',
        'in_progress': 'IN_PROGRESS',
        'pending': 'WAITING_ON_CUSTOMER',
        'resolved': 'RESOLVED',
        'closed': 'CLOSED'
    }

    connection = op.get_bind()
    for old_status, new_status in status_mapping.items():
        connection.execute(
            tickets.update().where(
                tickets.c.status == old_status
            ).values(status_new=new_status)
        )

    # Make new column non-nullable now that data is migrated
    op.alter_column('tickets', 'status_new', nullable=False)

    # Drop old column and rename new one
    op.drop_column('tickets', 'status')
    op.alter_column('tickets', 'status_new', new_column_name='status')

def downgrade() -> None:
    # Reverse the migration
    op.add_column('tickets',
        sa.Column('status_old', sa.String(50), nullable=True)
    )

    tickets = table('tickets',
        column('status', sa.String),
        column('status_old', sa.String)
    )

    # Reverse mapping
    reverse_mapping = {
        'OPEN': 'open',
        'IN_PROGRESS': 'in_progress',
        'WAITING_ON_CUSTOMER': 'pending',
        'RESOLVED': 'resolved',
        'CLOSED': 'closed'
    }

    connection = op.get_bind()
    for new_status, old_status in reverse_mapping.items():
        connection.execute(
            tickets.update().where(
                tickets.c.status == new_status
            ).values(status_old=old_status)
        )

    op.alter_column('tickets', 'status_old', nullable=False)
    op.drop_column('tickets', 'status')
    op.alter_column('tickets', 'status_old', new_column_name='status')
当需要转换现有数据时:
python
"""convert ticket status to new enum

Revision ID: data001
Revises: xyz789
Create Date: 2025-01-15 12:00:00.000000
"""

from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column

revision = 'data001'
down_revision = 'xyz789'

def upgrade() -> None:
    # Create new status column
    op.add_column('tickets',
        sa.Column('status_new', sa.String(50), nullable=True)
    )

    # Migrate data using bulk update
    tickets = table('tickets',
        column('status', sa.String),
        column('status_new', sa.String)
    )

    # Map old statuses to new ones
    status_mapping = {
        'open': 'OPEN',
        'in_progress': 'IN_PROGRESS',
        'pending': 'WAITING_ON_CUSTOMER',
        'resolved': 'RESOLVED',
        'closed': 'CLOSED'
    }

    connection = op.get_bind()
    for old_status, new_status in status_mapping.items():
        connection.execute(
            tickets.update().where(
                tickets.c.status == old_status
            ).values(status_new=new_status)
        )

    # Make new column non-nullable now that data is migrated
    op.alter_column('tickets', 'status_new', nullable=False)

    # Drop old column and rename new one
    op.drop_column('tickets', 'status')
    op.alter_column('tickets', 'status_new', new_column_name='status')

def downgrade() -> None:
    # Reverse the migration
    op.add_column('tickets',
        sa.Column('status_old', sa.String(50), nullable=True)
    )

    tickets = table('tickets',
        column('status', sa.String),
        column('status_old', sa.String)
    )

    # Reverse mapping
    reverse_mapping = {
        'OPEN': 'open',
        'IN_PROGRESS': 'in_progress',
        'WAITING_ON_CUSTOMER': 'pending',
        'RESOLVED': 'resolved',
        'CLOSED': 'closed'
    }

    connection = op.get_bind()
    for new_status, old_status in reverse_mapping.items():
        connection.execute(
            tickets.update().where(
                tickets.c.status == new_status
            ).values(status_old=old_status)
        )

    op.alter_column('tickets', 'status_old', nullable=False)
    op.drop_column('tickets', 'status')
    op.alter_column('tickets', 'status_old', new_column_name='status')

Large Data Migrations with Batching

批量处理的大数据迁移

For large tables, process data in batches:
python
"""add computed resolution time to tickets

Revision ID: data002
Revises: data001
Create Date: 2025-01-15 13:00:00.000000
"""

from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, select

revision = 'data002'
down_revision = 'data001'

def upgrade() -> None:
    # Add new column
    op.add_column('tickets',
        sa.Column('resolution_time_seconds', sa.Integer(), nullable=True)
    )

    connection = op.get_bind()
    tickets = table('tickets',
        column('id', sa.Integer),
        column('created_at', sa.DateTime),
        column('resolved_at', sa.DateTime),
        column('resolution_time_seconds', sa.Integer)
    )

    # Process in batches to avoid memory issues
    batch_size = 1000
    offset = 0

    while True:
        # Get batch of tickets that need processing
        batch = connection.execute(
            select(
                tickets.c.id,
                tickets.c.created_at,
                tickets.c.resolved_at
            ).where(
                sa.and_(
                    tickets.c.resolved_at.isnot(None),
                    tickets.c.resolution_time_seconds.is_(None)
                )
            ).limit(batch_size).offset(offset)
        ).fetchall()

        if not batch:
            break

        # Update batch
        for row in batch:
            if row.resolved_at and row.created_at:
                resolution_time = (row.resolved_at - row.created_at).total_seconds()
                connection.execute(
                    tickets.update().where(
                        tickets.c.id == row.id
                    ).values(resolution_time_seconds=int(resolution_time))
                )

        offset += batch_size

    # Now make column non-nullable for future rows
    op.alter_column('tickets', 'resolution_time_seconds',
        nullable=False, server_default='0')

def downgrade() -> None:
    op.drop_column('tickets', 'resolution_time_seconds')
对于大型表,分批处理数据:
python
"""add computed resolution time to tickets

Revision ID: data002
Revises: data001
Create Date: 2025-01-15 13:00:00.000000
"""

from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, select

revision = 'data002'
down_revision = 'data001'

def upgrade() -> None:
    # Add new column
    op.add_column('tickets',
        sa.Column('resolution_time_seconds', sa.Integer(), nullable=True)
    )

    connection = op.get_bind()
    tickets = table('tickets',
        column('id', sa.Integer),
        column('created_at', sa.DateTime),
        column('resolved_at', sa.DateTime),
        column('resolution_time_seconds', sa.Integer)
    )

    # Process in batches to avoid memory issues
    batch_size = 1000
    offset = 0

    while True:
        # Get batch of tickets that need processing
        batch = connection.execute(
            select(
                tickets.c.id,
                tickets.c.created_at,
                tickets.c.resolved_at
            ).where(
                sa.and_(
                    tickets.c.resolved_at.isnot(None),
                    tickets.c.resolution_time_seconds.is_(None)
                )
            ).limit(batch_size).offset(offset)
        ).fetchall()

        if not batch:
            break

        # Update batch
        for row in batch:
            if row.resolved_at and row.created_at:
                resolution_time = (row.resolved_at - row.created_at).total_seconds()
                connection.execute(
                    tickets.update().where(
                        tickets.c.id == row.id
                    ).values(resolution_time_seconds=int(resolution_time))
                )

        offset += batch_size

    # Now make column non-nullable for future rows
    op.alter_column('tickets', 'resolution_time_seconds',
        nullable=False, server_default='0')

def downgrade() -> None:
    op.drop_column('tickets', 'resolution_time_seconds')

Running Migrations

执行迁移

Upgrade Database to Latest

将数据库升级到最新版本

bash
undefined
bash
undefined

Upgrade to latest revision (head)

Upgrade to latest revision (head)

alembic upgrade head
alembic upgrade head

See what would be executed (SQL only, don't run)

See what would be executed (SQL only, don't run)

alembic upgrade head --sql
alembic upgrade head --sql

Upgrade one step at a time

Upgrade one step at a time

alembic upgrade +1
alembic upgrade +1

Upgrade to specific revision

Upgrade to specific revision

alembic upgrade abc123
undefined
alembic upgrade abc123
undefined

Downgrade Database

回滚数据库

bash
undefined
bash
undefined

Downgrade one revision

Downgrade one revision

alembic downgrade -1
alembic downgrade -1

Downgrade to specific revision

Downgrade to specific revision

alembic downgrade abc123
alembic downgrade abc123

Downgrade to base (empty database)

Downgrade to base (empty database)

alembic downgrade base
alembic downgrade base

Generate SQL for downgrade without executing

Generate SQL for downgrade without executing

alembic downgrade -1 --sql
undefined
alembic downgrade -1 --sql
undefined

Check Current Status

查看当前状态

bash
undefined
bash
undefined

Show current database revision

Show current database revision

alembic current
alembic current

Show current revision with details

Show current revision with details

alembic current --verbose
alembic current --verbose

Show migration history

Show migration history

alembic history
alembic history

Show history with current revision marked

Show history with current revision marked

alembic history --indicate-current
alembic history --indicate-current

Show specific revision range

Show specific revision range

alembic history -r base:head
undefined
alembic history -r base:head
undefined

Branching and Merging

分支与合并

Why Branch Migrations?

为什么要对迁移进行分支?

In customer support systems, you might have:
  • Feature branches: New features developed in parallel
  • Hotfix branches: Urgent fixes that can't wait for feature completion
  • Team branches: Multiple teams working on different modules
在客户支持系统中,你可能会遇到以下场景:
  • 功能分支:并行开发的新功能
  • 热修复分支:无法等待功能完成的紧急修复
  • 团队分支:多个团队在不同模块上工作

Creating a Branch

创建分支

bash
undefined
bash
undefined

Create base for new branch

Create base for new branch

alembic revision -m "create reporting branch"
--head=base
--branch-label=reporting
--version-path=alembic/versions/reporting
alembic revision -m "create reporting branch"
--head=base
--branch-label=reporting
--version-path=alembic/versions/reporting

Add migration to specific branch

Add migration to specific branch

alembic revision -m "add report tables"
--head=reporting@head

Example branch structure:
base ├── main branch │ ├── abc123: initial schema │ ├── def456: add tickets │ └── ghi789: add users └── reporting branch ├── rep001: create reports table └── rep002: add scheduled reports
undefined
alembic revision -m "add report tables"
--head=reporting@head

分支结构示例:
base ├── main branch │ ├── abc123: initial schema │ ├── def456: add tickets │ └── ghi789: add users └── reporting branch ├── rep001: create reports table └── rep002: add scheduled reports
undefined

Working with Multiple Branches

多分支协作

bash
undefined
bash
undefined

Show all branch heads

Show all branch heads

alembic heads
alembic heads

Show branch points

Show branch points

alembic branches
alembic branches

Upgrade specific branch

Upgrade specific branch

alembic upgrade reporting@head
alembic upgrade reporting@head

Upgrade all branches

Upgrade all branches

alembic upgrade heads
undefined
alembic upgrade heads
undefined

Merging Branches

合并分支

When features are ready to merge:
bash
undefined
当功能开发完成准备合并时:
bash
undefined

Merge two branches

Merge two branches

alembic merge -m "merge reporting into main"
main@head reporting@head

Generated merge migration:

```python
"""merge reporting into main

Revision ID: merge001
Revises: ghi789, rep002
Create Date: 2025-01-15 14:00:00.000000
"""

from alembic import op
import sqlalchemy as sa

revision = 'merge001'
down_revision = ('ghi789', 'rep002')  # Multiple parents
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Usually empty for simple merges
    # Add code if you need to reconcile conflicting changes
    pass

def downgrade() -> None:
    pass
alembic merge -m "merge reporting into main"
main@head reporting@head

生成的合并迁移:

```python
"""merge reporting into main

Revision ID: merge001
Revises: ghi789, rep002
Create Date: 2025-01-15 14:00:00.000000
"""

from alembic import op
import sqlalchemy as sa

revision = 'merge001'
down_revision = ('ghi789', 'rep002')  # Multiple parents
branch_labels = None
depends_on = None

def upgrade() -> None:
    # Usually empty for simple merges
    # Add code if you need to reconcile conflicting changes
    pass

def downgrade() -> None:
    pass

Cross-Branch Dependencies

跨分支依赖

When one branch depends on another:
bash
undefined
当一个分支依赖另一个分支时:
bash
undefined

Create migration that depends on specific revision from another branch

Create migration that depends on specific revision from another branch

alembic revision -m "reporting needs user table"
--head=reporting@head
--depends-on=def456 # Revision from main branch
undefined
alembic revision -m "reporting needs user table"
--head=reporting@head
--depends-on=def456 # Revision from main branch
undefined

Testing Migrations

迁移测试

Unit Testing Migrations

单元测试迁移

python
undefined
python
undefined

tests/test_migrations.py

tests/test_migrations.py

import pytest from alembic import command from alembic.config import Config from sqlalchemy import create_engine, inspect from sqlalchemy.orm import sessionmaker
@pytest.fixture def alembic_config(): """Provide Alembic configuration for testing""" config = Config("alembic.ini") config.set_main_option( "sqlalchemy.url", "postgresql://localhost/support_test" ) return config
@pytest.fixture def test_db(alembic_config): """Create test database and apply migrations""" # Create engine engine = create_engine( alembic_config.get_main_option("sqlalchemy.url") )
# Run migrations to head
command.upgrade(alembic_config, "head")

yield engine

# Cleanup - downgrade to base
command.downgrade(alembic_config, "base")
engine.dispose()
def test_migration_creates_tickets_table(test_db): """Test that migrations create expected tables""" inspector = inspect(test_db) tables = inspector.get_table_names()
assert 'tickets' in tables
assert 'users' in tables
assert 'customer_satisfaction' in tables
def test_tickets_table_structure(test_db): """Test ticket table has correct columns""" inspector = inspect(test_db) columns = {col['name']: col for col in inspector.get_columns('tickets')}
assert 'id' in columns
assert 'priority' in columns
assert 'status' in columns
assert 'created_at' in columns
assert 'resolution_time_seconds' in columns

# Check column types
assert columns['priority']['type'].python_type == str
assert columns['status']['type'].python_type == str
def test_migration_upgrade_downgrade_cycle(alembic_config): """Test that upgrade -> downgrade -> upgrade works""" # Start at base command.downgrade(alembic_config, "base")
# Upgrade to head
command.upgrade(alembic_config, "head")

# Downgrade one step
command.downgrade(alembic_config, "-1")

# Upgrade back to head
command.upgrade(alembic_config, "head")

# Should complete without errors
def test_data_migration_preserves_data(test_db): """Test that data migrations don't lose data""" from sqlalchemy.orm import sessionmaker from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()

# Insert test data
ticket = Ticket(
    title="Test ticket",
    status="OPEN",
    priority="high"
)
session.add(ticket)
session.commit()
ticket_id = ticket.id
session.close()

# Run a migration that modifies tickets table
# (This would be a specific revision)
# command.upgrade(alembic_config, "specific_revision")

# Verify data still exists
session = Session()
retrieved = session.query(Ticket).filter_by(id=ticket_id).first()
assert retrieved is not None
assert retrieved.title == "Test ticket"
session.close()
undefined
import pytest from alembic import command from alembic.config import Config from sqlalchemy import create_engine, inspect from sqlalchemy.orm import sessionmaker
@pytest.fixture def alembic_config(): """Provide Alembic configuration for testing""" config = Config("alembic.ini") config.set_main_option( "sqlalchemy.url", "postgresql://localhost/support_test" ) return config
@pytest.fixture def test_db(alembic_config): """Create test database and apply migrations""" # Create engine engine = create_engine( alembic_config.get_main_option("sqlalchemy.url") )
# Run migrations to head
command.upgrade(alembic_config, "head")

yield engine

# Cleanup - downgrade to base
command.downgrade(alembic_config, "base")
engine.dispose()
def test_migration_creates_tickets_table(test_db): """Test that migrations create expected tables""" inspector = inspect(test_db) tables = inspector.get_table_names()
assert 'tickets' in tables
assert 'users' in tables
assert 'customer_satisfaction' in tables
def test_tickets_table_structure(test_db): """Test ticket table has correct columns""" inspector = inspect(test_db) columns = {col['name']: col for col in inspector.get_columns('tickets')}
assert 'id' in columns
assert 'priority' in columns
assert 'status' in columns
assert 'created_at' in columns
assert 'resolution_time_seconds' in columns

# Check column types
assert columns['priority']['type'].python_type == str
assert columns['status']['type'].python_type == str
def test_migration_upgrade_downgrade_cycle(alembic_config): """Test that upgrade -> downgrade -> upgrade works""" # Start at base command.downgrade(alembic_config, "base")
# Upgrade to head
command.upgrade(alembic_config, "head")

# Downgrade one step
command.downgrade(alembic_config, "-1")

# Upgrade back to head
command.upgrade(alembic_config, "head")

# Should complete without errors
def test_data_migration_preserves_data(test_db): """Test that data migrations don't lose data""" from sqlalchemy.orm import sessionmaker from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()

# Insert test data
ticket = Ticket(
    title="Test ticket",
    status="OPEN",
    priority="high"
)
session.add(ticket)
session.commit()
ticket_id = ticket.id
session.close()

# Run a migration that modifies tickets table
# (This would be a specific revision)
# command.upgrade(alembic_config, "specific_revision")

# Verify data still exists
session = Session()
retrieved = session.query(Ticket).filter_by(id=ticket_id).first()
assert retrieved is not None
assert retrieved.title == "Test ticket"
session.close()
undefined

Integration Testing

集成测试

python
undefined
python
undefined

tests/test_migration_integration.py

tests/test_migration_integration.py

import pytest from alembic import command from alembic.config import Config from alembic.script import ScriptDirectory from alembic.runtime.migration import MigrationContext
def test_no_pending_migrations(alembic_config, test_db): """Ensure all migrations are applied in test environment""" script = ScriptDirectory.from_config(alembic_config)
with test_db.connect() as connection:
    context = MigrationContext.configure(connection)
    current_heads = set(context.get_current_heads())
    script_heads = set(script.get_heads())

    assert current_heads == script_heads, \
        f"Database has pending migrations. Current: {current_heads}, Expected: {script_heads}"
def test_migration_order_is_valid(alembic_config): """Verify migration chain has no gaps or conflicts""" script = ScriptDirectory.from_config(alembic_config)
# Get all revisions
revisions = list(script.walk_revisions())

# Check each revision has valid down_revision
for revision in revisions:
    if revision.down_revision is not None:
        if isinstance(revision.down_revision, tuple):
            # Merge point
            for down_rev in revision.down_revision:
                assert script.get_revision(down_rev) is not None
        else:
            assert script.get_revision(revision.down_revision) is not None
def test_check_command_detects_drift(alembic_config, test_db): """Test that check command detects schema drift""" # This test verifies that
alembic check
works correctly try: command.check(alembic_config) # If no exception, database matches models assert True except Exception as e: # If exception, there's drift between DB and models pytest.fail(f"Schema drift detected: {e}")
undefined
import pytest from alembic import command from alembic.config import Config from alembic.script import ScriptDirectory from alembic.runtime.migration import MigrationContext
def test_no_pending_migrations(alembic_config, test_db): """Ensure all migrations are applied in test environment""" script = ScriptDirectory.from_config(alembic_config)
with test_db.connect() as connection:
    context = MigrationContext.configure(connection)
    current_heads = set(context.get_current_heads())
    script_heads = set(script.get_heads())

    assert current_heads == script_heads, \
        f"Database has pending migrations. Current: {current_heads}, Expected: {script_heads}"
def test_migration_order_is_valid(alembic_config): """Verify migration chain has no gaps or conflicts""" script = ScriptDirectory.from_config(alembic_config)
# Get all revisions
revisions = list(script.walk_revisions())

# Check each revision has valid down_revision
for revision in revisions:
    if revision.down_revision is not None:
        if isinstance(revision.down_revision, tuple):
            # Merge point
            for down_rev in revision.down_revision:
                assert script.get_revision(down_rev) is not None
        else:
            assert script.get_revision(revision.down_revision) is not None
def test_check_command_detects_drift(alembic_config, test_db): """Test that check command detects schema drift""" # This test verifies that
alembic check
works correctly try: command.check(alembic_config) # If no exception, database matches models assert True except Exception as e: # If exception, there's drift between DB and models pytest.fail(f"Schema drift detected: {e}")
undefined

Testing Migration Performance

迁移性能测试

python
undefined
python
undefined

tests/test_migration_performance.py

tests/test_migration_performance.py

import time import pytest from alembic import command
def test_migration_completes_within_time_limit(alembic_config): """Ensure migrations complete within acceptable time""" # Downgrade to base command.downgrade(alembic_config, "base")
# Time the upgrade
start = time.time()
command.upgrade(alembic_config, "head")
duration = time.time() - start

# Assert completes within 60 seconds
assert duration < 60, f"Migration took {duration}s, exceeds 60s limit"
@pytest.mark.slow def test_data_migration_with_large_dataset(alembic_config, test_db): """Test data migration performance with realistic data volume""" from sqlalchemy.orm import sessionmaker from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()

# Create 10,000 test tickets
tickets = [
    Ticket(
        title=f"Test ticket {i}",
        status="OPEN",
        priority="normal"
    )
    for i in range(10000)
]
session.bulk_save_objects(tickets)
session.commit()
session.close()

# Run data migration and measure time
start = time.time()
command.upgrade(alembic_config, "data002")  # Specific data migration
duration = time.time() - start

# Should process 10k records in reasonable time
assert duration < 30, f"Data migration took {duration}s for 10k records"
undefined
import time import pytest from alembic import command
def test_migration_completes_within_time_limit(alembic_config): """Ensure migrations complete within acceptable time""" # Downgrade to base command.downgrade(alembic_config, "base")
# Time the upgrade
start = time.time()
command.upgrade(alembic_config, "head")
duration = time.time() - start

# Assert completes within 60 seconds
assert duration < 60, f"Migration took {duration}s, exceeds 60s limit"
@pytest.mark.slow def test_data_migration_with_large_dataset(alembic_config, test_db): """Test data migration performance with realistic data volume""" from sqlalchemy.orm import sessionmaker from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()

# Create 10,000 test tickets
tickets = [
    Ticket(
        title=f"Test ticket {i}",
        status="OPEN",
        priority="normal"
    )
    for i in range(10000)
]
session.bulk_save_objects(tickets)
session.commit()
session.close()

# Run data migration and measure time
start = time.time()
command.upgrade(alembic_config, "data002")  # Specific data migration
duration = time.time() - start

# Should process 10k records in reasonable time
assert duration < 30, f"Data migration took {duration}s for 10k records"
undefined

CI/CD Integration

CI/CD集成

GitHub Actions Workflow

GitHub Actions工作流

yaml
undefined
yaml
undefined

.github/workflows/migrations.yml

.github/workflows/migrations.yml

name: Database Migrations
on: pull_request: paths: - 'alembic/versions/' - 'myapp/models/' - 'alembic.ini' - 'alembic/env.py' push: branches: - main - develop
jobs: test-migrations: runs-on: ubuntu-latest
services:
  postgres:
    image: postgres:15
    env:
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: support_test
    options: >-
      --health-cmd pg_isready
      --health-interval 10s
      --health-timeout 5s
      --health-retries 5
    ports:
      - 5432:5432

steps:
  - uses: actions/checkout@v3

  - name: Set up Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: Install dependencies
    run: |
      pip install -r requirements.txt
      pip install pytest pytest-cov

  - name: Run migration tests
    env:
      DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
    run: |
      # Test upgrade to head
      alembic upgrade head

      # Test downgrade to base
      alembic downgrade base

      # Test upgrade again
      alembic upgrade head

      # Run pytest for migration tests
      pytest tests/test_migrations.py -v

  - name: Check for schema drift
    env:
      DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
    run: |
      alembic check

  - name: Validate migration history
    run: |
      # Check for multiple heads (should be only one)
      HEADS_COUNT=$(alembic heads | wc -l)
      if [ "$HEADS_COUNT" -gt 1 ]; then
        echo "ERROR: Multiple heads detected. Please merge branches."
        alembic heads
        exit 1
      fi
review-migration-sql: runs-on: ubuntu-latest if: github.event_name == 'pull_request'
steps:
  - uses: actions/checkout@v3

  - name: Set up Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: Install dependencies
    run: pip install -r requirements.txt

  - name: Generate SQL for review
    run: |
      # Generate SQL without executing
      alembic upgrade head --sql > migration.sql

  - name: Upload SQL artifact
    uses: actions/upload-artifact@v3
    with:
      name: migration-sql
      path: migration.sql

  - name: Comment PR with SQL
    uses: actions/github-script@v6
    with:
      script: |
        const fs = require('fs');
        const sql = fs.readFileSync('migration.sql', 'utf8');

        github.rest.issues.createComment({
          issue_number: context.issue.number,
          owner: context.repo.owner,
          repo: context.repo.repo,
          body: `## Migration SQL\n\n\`\`\`sql\n${sql}\n\`\`\``
        });
undefined
name: Database Migrations
on: pull_request: paths: - 'alembic/versions/' - 'myapp/models/' - 'alembic.ini' - 'alembic/env.py' push: branches: - main - develop
jobs: test-migrations: runs-on: ubuntu-latest
services:
  postgres:
    image: postgres:15
    env:
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: support_test
    options: >-
      --health-cmd pg_isready
      --health-interval 10s
      --health-timeout 5s
      --health-retries 5
    ports:
      - 5432:5432

steps:
  - uses: actions/checkout@v3

  - name: Set up Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: Install dependencies
    run: |
      pip install -r requirements.txt
      pip install pytest pytest-cov

  - name: Run migration tests
    env:
      DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
    run: |
      # Test upgrade to head
      alembic upgrade head

      # Test downgrade to base
      alembic downgrade base

      # Test upgrade again
      alembic upgrade head

      # Run pytest for migration tests
      pytest tests/test_migrations.py -v

  - name: Check for schema drift
    env:
      DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
    run: |
      alembic check

  - name: Validate migration history
    run: |
      # Check for multiple heads (should be only one)
      HEADS_COUNT=$(alembic heads | wc -l)
      if [ "$HEADS_COUNT" -gt 1 ]; then
        echo "ERROR: Multiple heads detected. Please merge branches."
        alembic heads
        exit 1
      fi
review-migration-sql: runs-on: ubuntu-latest if: github.event_name == 'pull_request'
steps:
  - uses: actions/checkout@v3

  - name: Set up Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: Install dependencies
    run: pip install -r requirements.txt

  - name: Generate SQL for review
    run: |
      # Generate SQL without executing
      alembic upgrade head --sql > migration.sql

  - name: Upload SQL artifact
    uses: actions/upload-artifact@v3
    with:
      name: migration-sql
      path: migration.sql

  - name: Comment PR with SQL
    uses: actions/github-script@v6
    with:
      script: |
        const fs = require('fs');
        const sql = fs.readFileSync('migration.sql', 'utf8');

        github.rest.issues.createComment({
          issue_number: context.issue.number,
          owner: context.repo.owner,
          repo: context.repo.repo,
          body: `## Migration SQL\n\n\`\`\`sql\n${sql}\n\`\`\``
        });
undefined

Deployment Script

部署脚本

bash
#!/bin/bash
bash
#!/bin/bash

scripts/deploy_migrations.sh

scripts/deploy_migrations.sh

set -e # Exit on error
echo "Starting database migration deployment..."
set -e # Exit on error
echo "Starting database migration deployment..."

Environment variables

Environment variables

DB_HOST="${DB_HOST:-localhost}" DB_NAME="${DB_NAME:-support_prod}" DB_USER="${DB_USER:-postgres}" DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}"
DB_HOST="${DB_HOST:-localhost}" DB_NAME="${DB_NAME:-support_prod}" DB_USER="${DB_USER:-postgres}" DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}"

Configuration

Configuration

BACKUP_DIR="./backups" TIMESTAMP=$(date +%Y%m%d_%H%M%S) BACKUP_FILE="${BACKUP_DIR}/pre_migration_${TIMESTAMP}.sql"
BACKUP_DIR="./backups" TIMESTAMP=$(date +%Y%m%d_%H%M%S) BACKUP_FILE="${BACKUP_DIR}/pre_migration_${TIMESTAMP}.sql"

Create backup directory

Create backup directory

mkdir -p "$BACKUP_DIR"
mkdir -p "$BACKUP_DIR"

1. Backup database before migration

1. Backup database before migration

echo "Creating database backup..." pg_dump "$DATABASE_URL" > "$BACKUP_FILE" echo "Backup created: $BACKUP_FILE"
echo "Creating database backup..." pg_dump "$DATABASE_URL" > "$BACKUP_FILE" echo "Backup created: $BACKUP_FILE"

2. Check current migration status

2. Check current migration status

echo "Current migration status:" alembic current
echo "Current migration status:" alembic current

3. Show pending migrations

3. Show pending migrations

echo "Pending migrations:" alembic history --verbose | grep -A 5 "head"
echo "Pending migrations:" alembic history --verbose | grep -A 5 "head"

4. Run migrations with timeout

4. Run migrations with timeout

echo "Running migrations..." timeout 300 alembic upgrade head || { echo "ERROR: Migration failed or timed out!" echo "Restoring from backup..." psql "$DATABASE_URL" < "$BACKUP_FILE" exit 1 }
echo "Running migrations..." timeout 300 alembic upgrade head || { echo "ERROR: Migration failed or timed out!" echo "Restoring from backup..." psql "$DATABASE_URL" < "$BACKUP_FILE" exit 1 }

5. Verify migration success

5. Verify migration success

echo "Verifying migration status..." CURRENT_REV=$(alembic current | grep "Rev:" | awk '{print $2}') HEAD_REV=$(alembic heads | awk '{print $1}')
if [ "$CURRENT_REV" != "$HEAD_REV" ]; then echo "ERROR: Migration incomplete. Current: $CURRENT_REV, Expected: $HEAD_REV" echo "Restoring from backup..." psql "$DATABASE_URL" < "$BACKUP_FILE" exit 1 fi
echo "Migration completed successfully!" echo "Current revision: $CURRENT_REV"
echo "Verifying migration status..." CURRENT_REV=$(alembic current | grep "Rev:" | awk '{print $2}') HEAD_REV=$(alembic heads | awk '{print $1}')
if [ "$CURRENT_REV" != "$HEAD_REV" ]; then echo "ERROR: Migration incomplete. Current: $CURRENT_REV, Expected: $HEAD_REV" echo "Restoring from backup..." psql "$DATABASE_URL" < "$BACKUP_FILE" exit 1 fi
echo "Migration completed successfully!" echo "Current revision: $CURRENT_REV"

6. Cleanup old backups (keep last 10)

6. Cleanup old backups (keep last 10)

echo "Cleaning up old backups..." ls -t "$BACKUP_DIR"/*.sql | tail -n +11 | xargs -r rm
echo "Deployment complete!"
undefined
echo "Cleaning up old backups..." ls -t "$BACKUP_DIR"/*.sql | tail -n +11 | xargs -r rm
echo "Deployment complete!"
undefined

Production Best Practices

生产环境最佳实践

Pre-Deployment Checklist

部署前检查清单

  • Migration tested in development environment
  • Migration tested in staging with production-like data
  • Migration reviewed by at least one team member
  • Downgrade path tested and verified
  • Performance impact assessed for large tables
  • Database backup plan in place
  • Rollback procedure documented
  • Maintenance window scheduled (if needed)
  • Team notified of deployment
  • Monitoring alerts configured
  • 迁移已在开发环境中测试
  • 迁移已在类生产数据的预发布环境中测试
  • 迁移已至少经过一名团队成员评审
  • 回滚路径已测试并验证
  • 已评估对大型表的性能影响
  • 已制定数据库备份计划
  • 已记录回滚流程
  • 已安排维护窗口(若需要)
  • 已通知团队部署事宜
  • 已配置监控告警

Zero-Downtime Migrations

零停机迁移

For critical support systems that can't go offline:
Phase 1: Additive Changes
python
"""add new column (phase 1)

Revision ID: zd001
"""

def upgrade() -> None:
    # Add new column as nullable
    op.add_column('tickets',
        sa.Column('new_field', sa.String(100), nullable=True)
    )

def downgrade() -> None:
    op.drop_column('tickets', 'new_field')
Phase 2: Data Migration (Background)
python
"""populate new column (phase 2)

Revision ID: zd002
"""

def upgrade() -> None:
    # Update in small batches during low-traffic periods
    connection = op.get_bind()

    batch_size = 100
    while True:
        result = connection.execute(
            """
            UPDATE tickets
            SET new_field = calculate_value(old_field)
            WHERE new_field IS NULL
            LIMIT {batch_size}
            """.format(batch_size=batch_size)
        )
        if result.rowcount == 0:
            break

        # Small delay to reduce database load
        import time
        time.sleep(0.1)

def downgrade() -> None:
    connection = op.get_bind()
    connection.execute("UPDATE tickets SET new_field = NULL")
Phase 3: Make Required
python
"""make new column required (phase 3)

Revision ID: zd003
"""

def upgrade() -> None:
    # Now that all rows have values, make it non-nullable
    op.alter_column('tickets', 'new_field',
        nullable=False,
        server_default='default_value'
    )

def downgrade() -> None:
    op.alter_column('tickets', 'new_field',
        nullable=True,
        server_default=None
    )
Phase 4: Remove Old Column (Optional)
python
"""remove old column (phase 4)

Revision ID: zd004
"""

def upgrade() -> None:
    op.drop_column('tickets', 'old_field')

def downgrade() -> None:
    op.add_column('tickets',
        sa.Column('old_field', sa.String(100), nullable=True)
    )
对于无法离线的关键支持系统:
阶段1:添加式变更
python
"""add new column (phase 1)

Revision ID: zd001
"""

def upgrade() -> None:
    # Add new column as nullable
    op.add_column('tickets',
        sa.Column('new_field', sa.String(100), nullable=True)
    )

def downgrade() -> None:
    op.drop_column('tickets', 'new_field')
阶段2:后台数据迁移
python
"""populate new column (phase 2)

Revision ID: zd002
"""

def upgrade() -> None:
    # Update in small batches during low-traffic periods
    connection = op.get_bind()

    batch_size = 100
    while True:
        result = connection.execute(
            """
            UPDATE tickets
            SET new_field = calculate_value(old_field)
            WHERE new_field IS NULL
            LIMIT {batch_size}
            """.format(batch_size=batch_size)
        )
        if result.rowcount == 0:
            break

        # Small delay to reduce database load
        import time
        time.sleep(0.1)

def downgrade() -> None:
    connection = op.get_bind()
    connection.execute("UPDATE tickets SET new_field = NULL")
阶段3:设为必填项
python
"""make new column required (phase 3)

Revision ID: zd003
"""

def upgrade() -> None:
    # Now that all rows have values, make it non-nullable
    op.alter_column('tickets', 'new_field',
        nullable=False,
        server_default='default_value'
    )

def downgrade() -> None:
    op.alter_column('tickets', 'new_field',
        nullable=True,
        server_default=None
    )
阶段4:移除旧列(可选)
python
"""remove old column (phase 4)

Revision ID: zd004
"""

def upgrade() -> None:
    op.drop_column('tickets', 'old_field')

def downgrade() -> None:
    op.add_column('tickets',
        sa.Column('old_field', sa.String(100), nullable=True)
    )

Handling Migration Failures

处理迁移失败

python
undefined
python
undefined

alembic/env.py additions for error handling

alembic/env.py additions for error handling

from alembic import context import logging
logger = logging.getLogger('alembic.env')
def run_migrations_online(): """Run migrations in 'online' mode with error handling"""
connectable = engine_from_config(
    config.get_section(config.config_ini_section),
    prefix='sqlalchemy.',
    poolclass=pool.NullPool,
)

with connectable.connect() as connection:
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        transaction_per_migration=True,  # Rollback individual migrations
        compare_type=True,
        compare_server_default=True
    )

    try:
        with context.begin_transaction():
            context.run_migrations()

    except Exception as e:
        logger.error(f"Migration failed: {e}")
        logger.error("Rolling back transaction...")
        # Transaction automatically rolled back
        raise

    else:
        logger.info("Migration completed successfully")
undefined
from alembic import context import logging
logger = logging.getLogger('alembic.env')
def run_migrations_online(): """Run migrations in 'online' mode with error handling"""
connectable = engine_from_config(
    config.get_section(config.config_ini_section),
    prefix='sqlalchemy.',
    poolclass=pool.NullPool,
)

with connectable.connect() as connection:
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        transaction_per_migration=True,  # Rollback individual migrations
        compare_type=True,
        compare_server_default=True
    )

    try:
        with context.begin_transaction():
            context.run_migrations()

    except Exception as e:
        logger.error(f"Migration failed: {e}")
        logger.error("Rolling back transaction...")
        # Transaction automatically rolled back
        raise

    else:
        logger.info("Migration completed successfully")
undefined

Advanced Configuration

高级配置

Custom Migration Template

自定义迁移模板

Create custom template for your organization:
python
undefined
为你的组织创建自定义模板:
python
undefined

alembic/script.py.mako

alembic/script.py.mako

"""${message}
Revision ID: ${up_revision} Revises: ${down_revision | comma,n} Create Date: ${create_date}
Author: ${author if author else 'Support Team'} Jira: ${jira_ticket if jira_ticket else 'N/A'} """
from alembic import op import sqlalchemy as sa ${imports if imports else ""}
"""${message}
Revision ID: ${up_revision} Revises: ${down_revision | comma,n} Create Date: ${create_date}
Author: ${author if author else 'Support Team'} Jira: ${jira_ticket if jira_ticket else 'N/A'} """
from alembic import op import sqlalchemy as sa ${imports if imports else ""}

revision identifiers, used by Alembic.

revision identifiers, used by Alembic.

revision = ${repr(up_revision)} down_revision = ${repr(down_revision)} branch_labels = ${repr(branch_labels)} depends_on = ${repr(depends_on)}
def upgrade() -> None: """Apply migration changes""" ${upgrades if upgrades else "pass"}
def downgrade() -> None: """Revert migration changes""" ${downgrades if downgrades else "pass"}
undefined
revision = ${repr(up_revision)} down_revision = ${repr(down_revision)} branch_labels = ${repr(branch_labels)} depends_on = ${repr(depends_on)}
def upgrade() -> None: """Apply migration changes""" ${upgrades if upgrades else "pass"}
def downgrade() -> None: """Revert migration changes""" ${downgrades if downgrades else "pass"}
undefined

Multi-Database Support

多数据库支持

For systems with separate databases (e.g., main DB + analytics):
python
undefined
对于拥有独立数据库的系统(例如主数据库+分析数据库):
python
undefined

alembic/env.py for multiple databases

alembic/env.py for multiple databases

def run_migrations_online(): """Run migrations for multiple databases"""
# Configuration for each database
engines = {
    'main': {
        'url': os.getenv('MAIN_DB_URL'),
        'target_metadata': main_metadata
    },
    'analytics': {
        'url': os.getenv('ANALYTICS_DB_URL'),
        'target_metadata': analytics_metadata
    }
}

for name, config in engines.items():
    logger.info(f"Running migrations for {name} database")

    engine = create_engine(config['url'])

    with engine.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=config['target_metadata'],
            upgrade_token=f"{name}_upgrade",
            downgrade_token=f"{name}_downgrade"
        )

        with context.begin_transaction():
            context.run_migrations(engine_name=name)
undefined
def run_migrations_online(): """Run migrations for multiple databases"""
# Configuration for each database
engines = {
    'main': {
        'url': os.getenv('MAIN_DB_URL'),
        'target_metadata': main_metadata
    },
    'analytics': {
        'url': os.getenv('ANALYTICS_DB_URL'),
        'target_metadata': analytics_metadata
    }
}

for name, config in engines.items():
    logger.info(f"Running migrations for {name} database")

    engine = create_engine(config['url'])

    with engine.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=config['target_metadata'],
            upgrade_token=f"{name}_upgrade",
            downgrade_token=f"{name}_downgrade"
        )

        with context.begin_transaction():
            context.run_migrations(engine_name=name)
undefined

Troubleshooting

故障排除

Common Issues and Solutions

常见问题与解决方案

Multiple Heads Error
bash
undefined
多Head错误
bash
undefined

Problem: "Multiple heads exist"

Problem: "Multiple heads exist"

Solution: Merge the branches

Solution: Merge the branches

alembic merge heads -m "merge branches"

**Migration Out of Sync**
```bash
alembic merge heads -m "merge branches"

**迁移不同步**
```bash

Problem: Database revision doesn't match migration history

Problem: Database revision doesn't match migration history

Solution: Stamp database to specific revision

Solution: Stamp database to specific revision

alembic stamp head
alembic stamp head

Or stamp to specific revision

Or stamp to specific revision

alembic stamp abc123

**Failed Migration Cleanup**
```bash
alembic stamp abc123

**迁移失败后的清理**
```bash

Problem: Migration failed midway

Problem: Migration failed midway

Solution: Manual cleanup

Solution: Manual cleanup

1. Check current state

1. Check current state

alembic current
alembic current

2. Manually fix database issues

2. Manually fix database issues

psql $DATABASE_URL
psql $DATABASE_URL

3. Stamp to correct revision

3. Stamp to correct revision

alembic stamp previous_working_revision
alembic stamp previous_working_revision

4. Try migration again

4. Try migration again

alembic upgrade head

**Circular Dependencies**
```bash
alembic upgrade head

**循环依赖**
```bash

Problem: "Circular dependency detected"

Problem: "Circular dependency detected"

Solution: Use depends_on instead of down_revision

Solution: Use depends_on instead of down_revision

alembic revision -m "fix circular dependency"
--head=branch_a@head
--depends-on=branch_b_revision
undefined
alembic revision -m "fix circular dependency"
--head=branch_a@head
--depends-on=branch_b_revision
undefined

Summary

总结

This skill covered comprehensive Alembic usage for customer support systems:
  1. Setup: Installation, configuration, and initialization
  2. Creating Migrations: Manual and autogenerated approaches
  3. Data Migrations: Transforming data during schema changes
  4. Running Migrations: Upgrade, downgrade, and status commands
  5. Branching: Managing parallel development streams
  6. Testing: Unit, integration, and performance testing
  7. CI/CD: Automation and deployment strategies
  8. Production: Zero-downtime migrations and best practices
  9. Advanced: Custom templates and multi-database support
  10. Troubleshooting: Common issues and solutions
Always remember:
  • Review autogenerated migrations
  • Test migrations thoroughly before production
  • Keep backups before major migrations
  • Plan for rollback scenarios
  • Monitor migration performance
  • Document complex migrations
For more examples, see EXAMPLES.md in this skill package.
本Skill涵盖了客户支持系统中Alembic的全面使用方法:
  1. 设置:安装、配置与初始化
  2. 创建迁移:手动与自动生成两种方式
  3. 数据迁移:架构变更期间的数据转换
  4. 执行迁移:升级、回滚与状态命令
  5. 分支管理:并行开发流的管理
  6. 测试:单元、集成与性能测试
  7. CI/CD:自动化与部署策略
  8. 生产环境:零停机迁移与最佳实践
  9. 高级功能:自定义模板与多数据库支持
  10. 故障排除:常见问题与解决方案
请始终记住:
  • 评审自动生成的迁移
  • 生产部署前彻底测试迁移
  • 重大迁移前保留备份
  • 规划回滚场景
  • 监控迁移性能
  • 记录复杂迁移
更多示例,请查看本Skill包中的EXAMPLES.md。